This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 64922c6 feat: support lower_bound&&upper_bound for parquet writer
(#383)
64922c6 is described below
commit 64922c60aa0f89b452e333252acfae2d15b2bb50
Author: ZENOTME <[email protected]>
AuthorDate: Tue Jul 2 08:46:54 2024 +0800
feat: support lower_bound&&upper_bound for parquet writer (#383)
---
crates/iceberg/src/arrow/schema.rs | 24 +-
crates/iceberg/src/spec/datatypes.rs | 17 +
crates/iceberg/src/spec/schema.rs | 2 +-
.../src/writer/base_writer/data_file_writer.rs | 187 +---
.../src/writer/file_writer/parquet_writer.rs | 1145 ++++++++++++++++----
crates/iceberg/src/writer/mod.rs | 51 -
6 files changed, 968 insertions(+), 458 deletions(-)
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index 260e144..3102f6d 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -35,6 +35,9 @@ use rust_decimal::prelude::ToPrimitive;
use std::collections::HashMap;
use std::sync::Arc;
+/// When iceberg map type convert to Arrow map type, the default map field
name is "key_value".
+pub(crate) const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
+
/// A post order arrow schema visitor.
///
/// For order of methods called, please refer to [`visit_schema`].
@@ -500,9 +503,10 @@ impl SchemaVisitor for ToArrowSchemaConverter {
_ => unreachable!(),
};
let field = Field::new(
- "entries",
+ DEFAULT_MAP_FIELD_NAME,
DataType::Struct(vec![key_field, value_field].into()),
- map.value_field.required,
+ // Map field is always not nullable
+ false,
);
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
@@ -562,7 +566,7 @@ impl SchemaVisitor for ToArrowSchemaConverter {
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
}
crate::spec::PrimitiveType::Time =>
Ok(ArrowSchemaOrFieldOrType::Type(
- DataType::Time32(TimeUnit::Microsecond),
+ DataType::Time64(TimeUnit::Microsecond),
)),
crate::spec::PrimitiveType::Timestamp =>
Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Timestamp(TimeUnit::Microsecond, None),
@@ -659,10 +663,9 @@ mod tests {
let r#struct = DataType::Struct(fields);
let map = DataType::Map(
Arc::new(
- Field::new("entries", r#struct,
false).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "19".to_string(),
- )])),
+ Field::new(DEFAULT_MAP_FIELD_NAME, r#struct,
false).with_metadata(HashMap::from([
+ (PARQUET_FIELD_ID_META_KEY.to_string(), "19".to_string()),
+ ])),
),
false,
);
@@ -1024,7 +1027,10 @@ mod tests {
]);
let r#struct = DataType::Struct(fields);
- let map = DataType::Map(Arc::new(Field::new("entries", r#struct,
false)), false);
+ let map = DataType::Map(
+ Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
+ false,
+ );
let fields = Fields::from(vec![
Field::new("aa", DataType::Int32,
false).with_metadata(HashMap::from([(
@@ -1088,7 +1094,7 @@ mod tests {
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)])),
- Field::new("i", DataType::Time32(TimeUnit::Microsecond),
false).with_metadata(
+ Field::new("i", DataType::Time64(TimeUnit::Microsecond),
false).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"9".to_string())]),
),
Field::new(
diff --git a/crates/iceberg/src/spec/datatypes.rs
b/crates/iceberg/src/spec/datatypes.rs
index cc911fa..137a9d1 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -667,6 +667,13 @@ pub struct ListType {
pub element_field: NestedFieldRef,
}
+impl ListType {
+ /// Construct a list type with the given element field.
+ pub fn new(element_field: NestedFieldRef) -> Self {
+ Self { element_field }
+ }
+}
+
/// Module for type serialization/deserialization.
pub(super) mod _serde {
use crate::spec::datatypes::Type::Map;
@@ -782,6 +789,16 @@ pub struct MapType {
pub value_field: NestedFieldRef,
}
+impl MapType {
+ /// Construct a map type with the given key and value fields.
+ pub fn new(key_field: NestedFieldRef, value_field: NestedFieldRef) -> Self
{
+ Self {
+ key_field,
+ value_field,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
diff --git a/crates/iceberg/src/spec/schema.rs
b/crates/iceberg/src/spec/schema.rs
index 93edcd7..c76701b 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -1196,7 +1196,7 @@ mod tests {
(schema, record)
}
- fn table_schema_nested() -> Schema {
+ pub fn table_schema_nested() -> Schema {
Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
index 442c9f1..638a905 100644
--- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
@@ -108,10 +108,13 @@ impl<B: FileWriterBuilder> CurrentFileStatus for
DataFileWriter<B> {
#[cfg(test)]
mod test {
- use std::{collections::HashMap, sync::Arc};
+ use std::sync::Arc;
- use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch,
StructArray};
- use parquet::{arrow::PARQUET_FIELD_ID_META_KEY,
file::properties::WriterProperties};
+ use crate::{
+ spec::{DataContentType, Schema, Struct},
+ Result,
+ };
+ use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
use crate::{
@@ -123,13 +126,12 @@ mod test {
location_generator::{test::MockLocationGenerator,
DefaultFileNameGenerator},
ParquetWriterBuilder,
},
- tests::check_parquet_data_file,
IcebergWriter, IcebergWriterBuilder,
},
};
#[tokio::test]
- async fn test_data_file_writer() -> Result<(), anyhow::Error> {
+ async fn test_parquet_writer() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let location_gen =
@@ -137,181 +139,22 @@ mod test {
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
- // prepare data
- // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
- let schema = {
- let fields = vec![
- arrow_schema::Field::new("col0",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "0".to_string(),
- )])),
- arrow_schema::Field::new(
- "col1",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "5".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "1".to_string(),
- )])),
- arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8,
true).with_metadata(
- HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string())]),
- ),
- arrow_schema::Field::new(
- "col3",
- arrow_schema::DataType::List(Arc::new(
- arrow_schema::Field::new("item",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "6".to_string(),
- )])),
- )),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "3".to_string(),
- )])),
- arrow_schema::Field::new(
- "col4",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "8".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "4".to_string(),
- )])),
- ];
- Arc::new(arrow_schema::Schema::new(fields))
- };
- let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
- let col1 = Arc::new(StructArray::new(
- vec![
- arrow_schema::Field::new("sub_col",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "5".to_string(),
- )])),
- ]
- .into(),
- vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
- None,
- ));
- let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
- "test";
- 1024
- ])) as ArrayRef;
- let col3 = Arc::new({
- let list_parts =
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
- Some(
- vec![Some(1),]
- );
- 1024
- ])
- .into_parts();
- arrow_array::ListArray::new(
-
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "6".to_string(),
- )]))),
- list_parts.1,
- list_parts.2,
- list_parts.3,
- )
- }) as ArrayRef;
- let col4 = Arc::new(StructArray::new(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "8".to_string(),
- )]))]
- .into(),
- vec![Arc::new(StructArray::new(
- vec![
- arrow_schema::Field::new("sub_sub_col",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )])),
- ]
- .into(),
- vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
- None,
- ))],
- None,
- ));
- let to_write =
- RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3,
col4]).unwrap();
-
- // prepare writer
- let pb = ParquetWriterBuilder::new(
+ let pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
- to_write.schema(),
+ Arc::new(Schema::builder().build().unwrap()),
file_io.clone(),
location_gen,
file_name_gen,
);
- let mut data_file_writer = DataFileWriterBuilder::new(pb)
+ let mut data_file_writer = DataFileWriterBuilder::new(pw)
.build(DataFileWriterConfig::new(None))
.await?;
- // write
- data_file_writer.write(to_write.clone()).await?;
- let res = data_file_writer.close().await?;
- assert_eq!(res.len(), 1);
- let data_file = res.into_iter().next().unwrap();
-
- // check
- check_parquet_data_file(&file_io, &data_file, &to_write).await;
+ let data_file = data_file_writer.close().await.unwrap();
+ assert_eq!(data_file.len(), 1);
+ assert_eq!(data_file[0].file_format, DataFileFormat::Parquet);
+ assert_eq!(data_file[0].content, DataContentType::Data);
+ assert_eq!(data_file[0].partition, Struct::empty());
Ok(())
}
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index a67d308..50a507e 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -17,13 +17,12 @@
//! The module contains the file writer for parquet file format.
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::{
- collections::HashMap,
- sync::{atomic::AtomicI64, Arc},
+use crate::arrow::DEFAULT_MAP_FIELD_NAME;
+use crate::spec::{
+ visit_schema, Datum, ListType, MapType, NestedFieldRef, PrimitiveLiteral,
PrimitiveType,
+ Schema, SchemaRef, SchemaVisitor, StructType, Type,
};
-
+use crate::ErrorKind;
use crate::{io::FileIO, io::FileWrite, Result};
use crate::{
io::OutputFile,
@@ -34,8 +33,25 @@ use crate::{
use arrow_schema::SchemaRef as ArrowSchemaRef;
use bytes::Bytes;
use futures::future::BoxFuture;
+use itertools::Itertools;
+use parquet::data_type::{
+ BoolType, ByteArrayType, DataType as ParquetDataType, DoubleType,
FixedLenByteArrayType,
+ FloatType, Int32Type, Int64Type,
+};
+use parquet::file::properties::WriterProperties;
+use parquet::file::statistics::TypedStatistics;
use parquet::{arrow::AsyncArrowWriter, format::FileMetaData};
-use parquet::{arrow::PARQUET_FIELD_ID_META_KEY,
file::properties::WriterProperties};
+use parquet::{
+ data_type::{ByteArray, FixedLenByteArray},
+ file::statistics::{from_thrift, Statistics},
+};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{
+ collections::HashMap,
+ sync::{atomic::AtomicI64, Arc},
+};
+use uuid::Uuid;
use super::{
location_generator::{FileNameGenerator, LocationGenerator},
@@ -47,7 +63,7 @@ use super::{
#[derive(Clone)]
pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
props: WriterProperties,
- schema: ArrowSchemaRef,
+ schema: SchemaRef,
file_io: FileIO,
location_generator: T,
@@ -59,7 +75,7 @@ impl<T: LocationGenerator, F: FileNameGenerator>
ParquetWriterBuilder<T, F> {
/// To construct the write result, the schema should contain the
`PARQUET_FIELD_ID_META_KEY` metadata for each field.
pub fn new(
props: WriterProperties,
- schema: ArrowSchemaRef,
+ schema: SchemaRef,
file_io: FileIO,
location_generator: T,
file_name_generator: F,
@@ -78,29 +94,7 @@ impl<T: LocationGenerator, F: FileNameGenerator>
FileWriterBuilder for ParquetWr
type R = ParquetWriter;
async fn build(self) -> crate::Result<Self::R> {
- // Fetch field id from schema
- let field_ids = self
- .schema
- .fields()
- .iter()
- .map(|field| {
- field
- .metadata()
- .get(PARQUET_FIELD_ID_META_KEY)
- .ok_or_else(|| {
- Error::new(
- crate::ErrorKind::Unexpected,
- "Field id not found in arrow schema metadata.",
- )
- })?
- .parse::<i32>()
- .map_err(|err| {
- Error::new(crate::ErrorKind::Unexpected, "Failed to
parse field id.")
- .with_source(err)
- })
- })
- .collect::<crate::Result<Vec<_>>>()?;
-
+ let arrow_schema: ArrowSchemaRef =
Arc::new(self.schema.as_ref().try_into()?);
let written_size = Arc::new(AtomicI64::new(0));
let out_file = self.file_io.new_output(
self.location_generator
@@ -108,76 +102,400 @@ impl<T: LocationGenerator, F: FileNameGenerator>
FileWriterBuilder for ParquetWr
)?;
let inner_writer = TrackWriter::new(out_file.writer().await?,
written_size.clone());
let async_writer = AsyncFileWriter::new(inner_writer);
- let writer = AsyncArrowWriter::try_new(async_writer,
self.schema.clone(), Some(self.props))
- .map_err(|err| {
- Error::new(
- crate::ErrorKind::Unexpected,
- "Failed to build parquet writer.",
- )
- .with_source(err)
- })?;
+ let writer =
+ AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(),
Some(self.props))
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "Failed to build parquet
writer.")
+ .with_source(err)
+ })?;
Ok(ParquetWriter {
+ schema: self.schema.clone(),
writer,
written_size,
current_row_num: 0,
out_file,
- field_ids,
})
}
}
+struct IndexByParquetPathName {
+ name_to_id: HashMap<String, i32>,
+
+ field_names: Vec<String>,
+
+ field_id: i32,
+}
+
+impl IndexByParquetPathName {
+ pub fn new() -> Self {
+ Self {
+ name_to_id: HashMap::new(),
+ field_names: Vec::new(),
+ field_id: 0,
+ }
+ }
+
+ pub fn get(&self, name: &str) -> Option<&i32> {
+ self.name_to_id.get(name)
+ }
+}
+
+impl SchemaVisitor for IndexByParquetPathName {
+ type T = ();
+
+ fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names.push(field.name.to_string());
+ self.field_id = field.id;
+ Ok(())
+ }
+
+ fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ self.field_names.pop();
+ Ok(())
+ }
+
+ fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names.push(format!("list.{}", field.name));
+ self.field_id = field.id;
+ Ok(())
+ }
+
+ fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ self.field_names.pop();
+ Ok(())
+ }
+
+ fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names
+ .push(format!("{DEFAULT_MAP_FIELD_NAME}.key"));
+ self.field_id = field.id;
+ Ok(())
+ }
+
+ fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ self.field_names.pop();
+ Ok(())
+ }
+
+ fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names
+ .push(format!("{DEFAULT_MAP_FIELD_NAME}.value"));
+ self.field_id = field.id;
+ Ok(())
+ }
+
+ fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ self.field_names.pop();
+ Ok(())
+ }
+
+ fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T>
{
+ Ok(())
+ }
+
+ fn field(&mut self, _field: &NestedFieldRef, _value: Self::T) ->
Result<Self::T> {
+ Ok(())
+ }
+
+ fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) ->
Result<Self::T> {
+ Ok(())
+ }
+
+ fn list(&mut self, _list: &ListType, _value: Self::T) -> Result<Self::T> {
+ Ok(())
+ }
+
+ fn map(&mut self, _map: &MapType, _key_value: Self::T, _value: Self::T) ->
Result<Self::T> {
+ Ok(())
+ }
+
+ fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
+ let full_name = self.field_names.iter().map(String::as_str).join(".");
+ let field_id = self.field_id;
+ if let Some(existing_field_id) =
self.name_to_id.get(full_name.as_str()) {
+ return Err(Error::new(ErrorKind::DataInvalid, format!("Invalid
schema: multiple fields for name {full_name}: {field_id} and
{existing_field_id}")));
+ } else {
+ self.name_to_id.insert(full_name, field_id);
+ }
+
+ Ok(())
+ }
+}
+
/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
pub struct ParquetWriter {
+ schema: SchemaRef,
out_file: OutputFile,
writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
written_size: Arc<AtomicI64>,
current_row_num: usize,
- field_ids: Vec<i32>,
+}
+
+/// Used to aggregate min and max value of each column.
+struct MinMaxColAggregator {
+ lower_bounds: HashMap<i32, Datum>,
+ upper_bounds: HashMap<i32, Datum>,
+ schema: SchemaRef,
+}
+
+impl MinMaxColAggregator {
+ fn new(schema: SchemaRef) -> Self {
+ Self {
+ lower_bounds: HashMap::new(),
+ upper_bounds: HashMap::new(),
+ schema,
+ }
+ }
+
+ fn update_state<T: ParquetDataType>(
+ &mut self,
+ field_id: i32,
+ state: &TypedStatistics<T>,
+ convert_func: impl Fn(<T as ParquetDataType>::T) -> Result<Datum>,
+ ) {
+ if state.min_is_exact() {
+ let val = convert_func(state.min().clone()).unwrap();
+ self.lower_bounds
+ .entry(field_id)
+ .and_modify(|e| {
+ if *e > val {
+ *e = val.clone()
+ }
+ })
+ .or_insert(val);
+ }
+ if state.max_is_exact() {
+ let val = convert_func(state.max().clone()).unwrap();
+ self.upper_bounds
+ .entry(field_id)
+ .and_modify(|e| {
+ if *e < val {
+ *e = val.clone()
+ }
+ })
+ .or_insert(val);
+ }
+ }
+
+ fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
+ let Some(ty) = self
+ .schema
+ .field_by_id(field_id)
+ .map(|f| f.field_type.as_ref())
+ else {
+ // Following java implementation:
https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
+ // Ignore the field if it is not in schema.
+ return Ok(());
+ };
+ let Type::Primitive(ty) = ty.clone() else {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Composed type {} is not supported for min max
aggregation.",
+ ty
+ ),
+ ));
+ };
+
+ match (&ty, value) {
+ (PrimitiveType::Boolean, Statistics::Boolean(stat)) => {
+ let convert_func = |v: bool|
Result::<Datum>::Ok(Datum::bool(v));
+ self.update_state::<BoolType>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::Int, Statistics::Int32(stat)) => {
+ let convert_func = |v: i32| Result::<Datum>::Ok(Datum::int(v));
+ self.update_state::<Int32Type>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::Long, Statistics::Int64(stat)) => {
+ let convert_func = |v: i64|
Result::<Datum>::Ok(Datum::long(v));
+ self.update_state::<Int64Type>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::Float, Statistics::Float(stat)) => {
+ let convert_func = |v: f32|
Result::<Datum>::Ok(Datum::float(v));
+ self.update_state::<FloatType>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::Double, Statistics::Double(stat)) => {
+ let convert_func = |v: f64|
Result::<Datum>::Ok(Datum::double(v));
+ self.update_state::<DoubleType>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::String, Statistics::ByteArray(stat)) => {
+ let convert_func = |v: ByteArray| {
+ Result::<Datum>::Ok(Datum::string(
+ String::from_utf8(v.data().to_vec()).unwrap(),
+ ))
+ };
+ self.update_state::<ByteArrayType>(field_id, &stat,
convert_func)
+ }
+ (PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
+ let convert_func =
+ |v: ByteArray|
Result::<Datum>::Ok(Datum::binary(v.data().to_vec()));
+ self.update_state::<ByteArrayType>(field_id, &stat,
convert_func)
+ }
+ (PrimitiveType::Date, Statistics::Int32(stat)) => {
+ let convert_func = |v: i32|
Result::<Datum>::Ok(Datum::date(v));
+ self.update_state::<Int32Type>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::Time, Statistics::Int64(stat)) => {
+ let convert_func = |v: i64| Datum::time_micros(v);
+ self.update_state::<Int64Type>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::Timestamp, Statistics::Int64(stat)) => {
+ let convert_func = |v: i64|
Result::<Datum>::Ok(Datum::timestamp_micros(v));
+ self.update_state::<Int64Type>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::Timestamptz, Statistics::Int64(stat)) => {
+ let convert_func = |v: i64|
Result::<Datum>::Ok(Datum::timestamptz_micros(v));
+ self.update_state::<Int64Type>(field_id, &stat, convert_func)
+ }
+ (
+ PrimitiveType::Decimal {
+ precision: _,
+ scale: _,
+ },
+ Statistics::ByteArray(stat),
+ ) => {
+ let convert_func = |v: ByteArray| -> Result<Datum> {
+ Result::<Datum>::Ok(Datum::new(
+ ty.clone(),
+ PrimitiveLiteral::Decimal(i128::from_le_bytes(
+ v.data().try_into().unwrap(),
+ )),
+ ))
+ };
+ self.update_state::<ByteArrayType>(field_id, &stat,
convert_func)
+ }
+ (
+ PrimitiveType::Decimal {
+ precision: _,
+ scale: _,
+ },
+ Statistics::Int32(stat),
+ ) => {
+ let convert_func = |v: i32| {
+ Result::<Datum>::Ok(Datum::new(
+ ty.clone(),
+ PrimitiveLiteral::Decimal(i128::from(v)),
+ ))
+ };
+ self.update_state::<Int32Type>(field_id, &stat, convert_func)
+ }
+ (
+ PrimitiveType::Decimal {
+ precision: _,
+ scale: _,
+ },
+ Statistics::Int64(stat),
+ ) => {
+ let convert_func = |v: i64| {
+ Result::<Datum>::Ok(Datum::new(
+ ty.clone(),
+ PrimitiveLiteral::Decimal(i128::from(v)),
+ ))
+ };
+ self.update_state::<Int64Type>(field_id, &stat, convert_func)
+ }
+ (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stat)) => {
+ let convert_func = |v: FixedLenByteArray| {
+ if v.len() != 16 {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Invalid length of uuid bytes.",
+ ));
+ }
+ Ok(Datum::uuid(Uuid::from_bytes(
+ v.data()[..16].try_into().unwrap(),
+ )))
+ };
+ self.update_state::<FixedLenByteArrayType>(field_id, &stat,
convert_func)
+ }
+ (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat))
=> {
+ let convert_func = |v: FixedLenByteArray| {
+ if v.len() != *len as usize {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Invalid length of fixed bytes.",
+ ));
+ }
+ Ok(Datum::fixed(v.data().to_vec()))
+ };
+ self.update_state::<FixedLenByteArrayType>(field_id, &stat,
convert_func)
+ }
+ (ty, value) => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Statistics {} is not match with field type {}.",
value, ty),
+ ))
+ }
+ }
+ Ok(())
+ }
+
+ fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
+ (self.lower_bounds, self.upper_bounds)
+ }
}
impl ParquetWriter {
fn to_data_file_builder(
- field_ids: &[i32],
+ schema: SchemaRef,
metadata: FileMetaData,
written_size: usize,
file_path: String,
) -> Result<DataFileBuilder> {
- // Only enter here when the file is not empty.
- assert!(!metadata.row_groups.is_empty());
- if field_ids.len() != metadata.row_groups[0].columns.len() {
- return Err(Error::new(
- crate::ErrorKind::Unexpected,
- "Len of field id is not match with len of columns in parquet
metadata.",
- ));
- }
+ let index_by_parquet_path = {
+ let mut visitor = IndexByParquetPathName::new();
+ visit_schema(&schema, &mut visitor)?;
+ visitor
+ };
- let (column_sizes, value_counts, null_value_counts) =
- {
- let mut per_col_size: HashMap<i32, u64> = HashMap::new();
- let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
- let mut per_col_null_val_num: HashMap<i32, u64> =
HashMap::new();
- metadata.row_groups.iter().for_each(|group| {
- group.columns.iter().zip(field_ids.iter()).for_each(
- |(column_chunk, &field_id)| {
- if let Some(column_chunk_metadata) =
&column_chunk.meta_data {
- *per_col_size.entry(field_id).or_insert(0) +=
-
column_chunk_metadata.total_compressed_size as u64;
- *per_col_val_num.entry(field_id).or_insert(0)
+=
- column_chunk_metadata.num_values as u64;
-
*per_col_null_val_num.entry(field_id).or_insert(0_u64) +=
- column_chunk_metadata
- .statistics
- .as_ref()
- .map(|s| s.null_count)
- .unwrap_or(None)
- .unwrap_or(0) as u64;
- }
- },
- )
- });
- (per_col_size, per_col_val_num, per_col_null_val_num)
- };
+ let (column_sizes, value_counts, null_value_counts, (lower_bounds,
upper_bounds)) = {
+ let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+ let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+ let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
+ let mut min_max_agg = MinMaxColAggregator::new(schema);
+
+ for row_group in &metadata.row_groups {
+ for column_chunk in row_group.columns.iter() {
+ let Some(column_chunk_metadata) = &column_chunk.meta_data
else {
+ continue;
+ };
+ let physical_type = column_chunk_metadata.type_;
+ let Some(&field_id) =
+
index_by_parquet_path.get(&column_chunk_metadata.path_in_schema.join("."))
+ else {
+ // Following java implementation:
https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
+ // Ignore the field if it is not in schema.
+ continue;
+ };
+ *per_col_size.entry(field_id).or_insert(0) +=
+ column_chunk_metadata.total_compressed_size as u64;
+ *per_col_val_num.entry(field_id).or_insert(0) +=
+ column_chunk_metadata.num_values as u64;
+ if let Some(null_count) = column_chunk_metadata
+ .statistics
+ .as_ref()
+ .and_then(|s| s.null_count)
+ {
+ *per_col_null_val_num.entry(field_id).or_insert(0_u64)
+= null_count as u64;
+ }
+ if let Some(statistics) =
&column_chunk_metadata.statistics {
+ min_max_agg.update(
+ field_id,
+ from_thrift(physical_type.try_into()?,
Some(statistics.clone()))?
+ .unwrap(),
+ )?;
+ }
+ }
+ }
+
+ (
+ per_col_size,
+ per_col_val_num,
+ per_col_null_val_num,
+ min_max_agg.produce(),
+ )
+ };
let mut builder = DataFileBuilder::default();
builder
@@ -188,10 +506,11 @@ impl ParquetWriter {
.column_sizes(column_sizes)
.value_counts(value_counts)
.null_value_counts(null_value_counts)
- // # TODO
+ .lower_bounds(lower_bounds)
+ .upper_bounds(upper_bounds)
+ // # TODO(#417)
// - nan_value_counts
- // - lower_bounds
- // - upper_bounds
+ // - distinct_counts
.key_metadata(metadata.footer_signing_key_metadata.unwrap_or_default())
.split_offsets(
metadata
@@ -209,7 +528,7 @@ impl FileWriter for ParquetWriter {
self.current_row_num += batch.num_rows();
self.writer.write(batch).await.map_err(|err| {
Error::new(
- crate::ErrorKind::Unexpected,
+ ErrorKind::Unexpected,
"Failed to write using parquet writer.",
)
.with_source(err)
@@ -219,17 +538,13 @@ impl FileWriter for ParquetWriter {
async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
let metadata = self.writer.close().await.map_err(|err| {
- Error::new(
- crate::ErrorKind::Unexpected,
- "Failed to close parquet writer.",
- )
- .with_source(err)
+ Error::new(ErrorKind::Unexpected, "Failed to close parquet
writer.").with_source(err)
})?;
let written_size =
self.written_size.load(std::sync::atomic::Ordering::Relaxed);
Ok(vec![Self::to_data_file_builder(
- &self.field_ids,
+ self.schema,
metadata,
written_size as usize,
self.out_file.location().to_string(),
@@ -357,15 +672,21 @@ mod tests {
use anyhow::Result;
use arrow_array::types::Int64Type;
use arrow_array::ArrayRef;
+ use arrow_array::BooleanArray;
+ use arrow_array::Int32Array;
use arrow_array::Int64Array;
+ use arrow_array::ListArray;
use arrow_array::RecordBatch;
use arrow_array::StructArray;
+ use arrow_schema::DataType;
+ use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_select::concat::concat_batches;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use tempfile::TempDir;
use super::*;
use crate::io::FileIOBuilder;
+ use crate::spec::NestedField;
use crate::spec::Struct;
use
crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use
crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
@@ -374,6 +695,131 @@ mod tests {
#[derive(Clone)]
struct TestLocationGen;
+ fn schema_for_all_type() -> Schema {
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::optional(0, "boolean",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ NestedField::optional(1, "int",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "long",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::optional(3, "float",
Type::Primitive(PrimitiveType::Float)).into(),
+ NestedField::optional(4, "double",
Type::Primitive(PrimitiveType::Double)).into(),
+ NestedField::optional(5, "string",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(6, "binary",
Type::Primitive(PrimitiveType::Binary)).into(),
+ NestedField::optional(7, "date",
Type::Primitive(PrimitiveType::Date)).into(),
+ NestedField::optional(8, "time",
Type::Primitive(PrimitiveType::Time)).into(),
+ NestedField::optional(9, "timestamp",
Type::Primitive(PrimitiveType::Timestamp))
+ .into(),
+ NestedField::optional(
+ 10,
+ "timestamptz",
+ Type::Primitive(PrimitiveType::Timestamptz),
+ )
+ .into(),
+ NestedField::optional(
+ 11,
+ "decimal",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 10,
+ scale: 5,
+ }),
+ )
+ .into(),
+ NestedField::optional(12, "uuid",
Type::Primitive(PrimitiveType::Uuid)).into(),
+ NestedField::optional(13, "fixed",
Type::Primitive(PrimitiveType::Fixed(10)))
+ .into(),
+ ])
+ .build()
+ .unwrap()
+ }
+
+ fn nested_schema_for_test() -> Schema {
+ // Int, Struct(Int,Int), String, List(Int), Struct(Struct(Int)),
Map(String, List(Int))
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(0, "col0",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(
+ 1,
+ "col1",
+ Type::Struct(StructType::new(vec![
+ NestedField::required(5, "col_1_5",
Type::Primitive(PrimitiveType::Long))
+ .into(),
+ NestedField::required(6, "col_1_6",
Type::Primitive(PrimitiveType::Long))
+ .into(),
+ ])),
+ )
+ .into(),
+ NestedField::required(2, "col2",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(
+ 3,
+ "col3",
+ Type::List(ListType::new(
+ NestedField::required(7, "element",
Type::Primitive(PrimitiveType::Long))
+ .into(),
+ )),
+ )
+ .into(),
+ NestedField::required(
+ 4,
+ "col4",
+ Type::Struct(StructType::new(vec![NestedField::required(
+ 8,
+ "col_4_8",
+
Type::Struct(StructType::new(vec![NestedField::required(
+ 9,
+ "col_4_8_9",
+ Type::Primitive(PrimitiveType::Long),
+ )
+ .into()])),
+ )
+ .into()])),
+ )
+ .into(),
+ NestedField::required(
+ 10,
+ "col5",
+ Type::Map(MapType::new(
+ NestedField::required(11, "key",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(
+ 12,
+ "value",
+ Type::List(ListType::new(
+ NestedField::required(
+ 13,
+ "item",
+ Type::Primitive(PrimitiveType::Long),
+ )
+ .into(),
+ )),
+ )
+ .into(),
+ )),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_index_by_parquet_path() {
+ let expect = HashMap::from([
+ ("col0".to_string(), 0),
+ ("col1.col_1_5".to_string(), 5),
+ ("col1.col_1_6".to_string(), 6),
+ ("col2".to_string(), 2),
+ ("col3.list.element".to_string(), 7),
+ ("col4.col_4_8.col_4_8_9".to_string(), 9),
+ ("col5.key_value.key".to_string(), 11),
+ ("col5.key_value.value.list.item".to_string(), 13),
+ ]);
+ let mut visitor = IndexByParquetPathName::new();
+ visit_schema(&nested_schema_for_test(), &mut visitor).unwrap();
+ assert_eq!(visitor.name_to_id, expect);
+ }
+
#[tokio::test]
async fn test_parquet_writer() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
@@ -392,7 +838,7 @@ mod tests {
];
Arc::new(arrow_schema::Schema::new(fields))
};
- let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
+ let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
let to_write = RecordBatch::try_new(schema.clone(),
vec![col]).unwrap();
let to_write_null = RecordBatch::try_new(schema.clone(),
vec![null_col]).unwrap();
@@ -400,7 +846,7 @@ mod tests {
// write data
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
- to_write.schema(),
+ Arc::new(to_write.schema().as_ref().try_into().unwrap()),
file_io.clone(),
loccation_gen,
file_name_gen,
@@ -421,6 +867,19 @@ mod tests {
.build()
.unwrap();
+ // check data file
+ assert_eq!(data_file.record_count(), 2048);
+ assert_eq!(*data_file.value_counts(), HashMap::from([(0, 2048)]));
+ assert_eq!(
+ *data_file.lower_bounds(),
+ HashMap::from([(0, Datum::long(0))])
+ );
+ assert_eq!(
+ *data_file.upper_bounds(),
+ HashMap::from([(0, Datum::long(1023))])
+ );
+ assert_eq!(*data_file.null_value_counts(), HashMap::from([(0, 1024)]));
+
// check the written file
let expect_batch = concat_batches(&schema, vec![&to_write,
&to_write_null]).unwrap();
check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
@@ -438,164 +897,153 @@ mod tests {
DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
// prepare data
- // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
- let schema = {
- let fields = vec![
- arrow_schema::Field::new("col0",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "0".to_string(),
- )])),
- arrow_schema::Field::new(
- "col1",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "5".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "1".to_string(),
- )])),
- arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8,
true).with_metadata(
- HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string())]),
- ),
- arrow_schema::Field::new(
- "col3",
- arrow_schema::DataType::List(Arc::new(
- arrow_schema::Field::new("item",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "6".to_string(),
- )])),
- )),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "3".to_string(),
- )])),
- arrow_schema::Field::new(
- "col4",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "8".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "4".to_string(),
- )])),
- ];
- Arc::new(arrow_schema::Schema::new(fields))
- };
- let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
+ let schema = nested_schema_for_test();
+ let arrow_schema: ArrowSchemaRef =
Arc::new((&schema).try_into().unwrap());
+ let col0 = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
let col1 = Arc::new(StructArray::new(
+ {
+ if let DataType::Struct(fields) =
arrow_schema.field(1).data_type() {
+ fields.clone()
+ } else {
+ unreachable!()
+ }
+ },
vec![
- arrow_schema::Field::new("sub_col",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "5".to_string(),
- )])),
- ]
- .into(),
- vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+ Arc::new(Int64Array::from_iter_values(0..1024)),
+ Arc::new(Int64Array::from_iter_values(0..1024)),
+ ],
None,
));
- let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
- "test";
- 1024
- ])) as ArrayRef;
+ let col2 = Arc::new(arrow_array::StringArray::from_iter_values(
+ (0..1024).map(|n| n.to_string()),
+ )) as ArrayRef;
let col3 = Arc::new({
- let list_parts =
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
- Some(
- vec![Some(1),]
- );
- 1024
- ])
+ let list_parts =
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(
+ (0..1024).map(|n| Some(vec![Some(n)])),
+ )
.into_parts();
arrow_array::ListArray::new(
-
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "6".to_string(),
- )]))),
+ {
+ if let DataType::List(field) =
arrow_schema.field(3).data_type() {
+ field.clone()
+ } else {
+ unreachable!()
+ }
+ },
list_parts.1,
list_parts.2,
list_parts.3,
)
}) as ArrayRef;
let col4 = Arc::new(StructArray::new(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "8".to_string(),
- )]))]
- .into(),
+ {
+ if let DataType::Struct(fields) =
arrow_schema.field(4).data_type() {
+ fields.clone()
+ } else {
+ unreachable!()
+ }
+ },
vec![Arc::new(StructArray::new(
- vec![
- arrow_schema::Field::new("sub_sub_col",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )])),
- ]
- .into(),
- vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+ {
+ if let DataType::Struct(fields) =
arrow_schema.field(4).data_type() {
+ if let DataType::Struct(fields) =
fields[0].data_type() {
+ fields.clone()
+ } else {
+ unreachable!()
+ }
+ } else {
+ unreachable!()
+ }
+ },
+ vec![Arc::new(Int64Array::from_iter_values(0..1024))],
None,
))],
None,
));
- let to_write =
- RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3,
col4]).unwrap();
+ let col5 = Arc::new({
+ let mut map_array_builder = arrow_array::builder::MapBuilder::new(
+ None,
+ arrow_array::builder::StringBuilder::new(),
+
arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
+ Int64Type,
+ >::new()),
+ );
+ for i in 0..1024 {
+ map_array_builder.keys().append_value(i.to_string());
+ map_array_builder
+ .values()
+ .append_value(vec![Some(i as i64); i + 1]);
+ map_array_builder.append(true)?;
+ }
+ let (_, offset_buffer, struct_array, null_buffer, ordered) =
+ map_array_builder.finish().into_parts();
+ let struct_array = {
+ let (_, mut arrays, nulls) = struct_array.into_parts();
+ let list_array = {
+ let list_array = arrays[1]
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap()
+ .clone();
+ let (_, offsets, array, nulls) = list_array.into_parts();
+ let list_field = {
+ if let DataType::Map(map_field, _) =
arrow_schema.field(5).data_type() {
+ if let DataType::Struct(fields) =
map_field.data_type() {
+ if let DataType::List(list_field) =
fields[1].data_type() {
+ list_field.clone()
+ } else {
+ unreachable!()
+ }
+ } else {
+ unreachable!()
+ }
+ } else {
+ unreachable!()
+ }
+ };
+ ListArray::new(list_field, offsets, array, nulls)
+ };
+ arrays[1] = Arc::new(list_array) as ArrayRef;
+ StructArray::new(
+ {
+ if let DataType::Map(map_field, _) =
arrow_schema.field(5).data_type() {
+ if let DataType::Struct(fields) =
map_field.data_type() {
+ fields.clone()
+ } else {
+ unreachable!()
+ }
+ } else {
+ unreachable!()
+ }
+ },
+ arrays,
+ nulls,
+ )
+ };
+ arrow_array::MapArray::new(
+ {
+ if let DataType::Map(map_field, _) =
arrow_schema.field(5).data_type() {
+ map_field.clone()
+ } else {
+ unreachable!()
+ }
+ },
+ offset_buffer,
+ struct_array,
+ null_buffer,
+ ordered,
+ )
+ }) as ArrayRef;
+ let to_write = RecordBatch::try_new(
+ arrow_schema.clone(),
+ vec![col0, col1, col2, col3, col4, col5],
+ )
+ .unwrap();
// write data
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
- to_write.schema(),
+ Arc::new(schema),
file_io.clone(),
location_gen,
file_name_gen,
@@ -615,6 +1063,253 @@ mod tests {
.build()
.unwrap();
+ // check data file
+ assert_eq!(data_file.record_count(), 1024);
+ assert_eq!(
+ *data_file.value_counts(),
+ HashMap::from([
+ (0, 1024),
+ (5, 1024),
+ (6, 1024),
+ (2, 1024),
+ (7, 1024),
+ (9, 1024),
+ (11, 1024),
+ (13, (1..1025).sum()),
+ ])
+ );
+ assert_eq!(
+ *data_file.lower_bounds(),
+ HashMap::from([
+ (0, Datum::long(0)),
+ (5, Datum::long(0)),
+ (6, Datum::long(0)),
+ (2, Datum::string("0")),
+ (7, Datum::long(0)),
+ (9, Datum::long(0)),
+ (11, Datum::string("0")),
+ (13, Datum::long(0))
+ ])
+ );
+ assert_eq!(
+ *data_file.upper_bounds(),
+ HashMap::from([
+ (0, Datum::long(1023)),
+ (5, Datum::long(1023)),
+ (6, Datum::long(1023)),
+ (2, Datum::string("999")),
+ (7, Datum::long(1023)),
+ (9, Datum::long(1023)),
+ (11, Datum::string("999")),
+ (13, Datum::long(1023))
+ ])
+ );
+
+ // check the written file
+ check_parquet_data_file(&file_io, &data_file, &to_write).await;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_all_type_for_write() -> Result<()> {
+ let temp_dir = TempDir::new().unwrap();
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let loccation_gen =
+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+ let file_name_gen =
+ DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
+
+ // prepare data
+ // generate iceberg schema for all type
+ let schema = schema_for_all_type();
+ let arrow_schema: ArrowSchemaRef =
Arc::new((&schema).try_into().unwrap());
+ let col0 = Arc::new(BooleanArray::from(vec![
+ Some(true),
+ Some(false),
+ None,
+ Some(true),
+ ])) as ArrayRef;
+ let col1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), None,
Some(4)])) as ArrayRef;
+ let col2 = Arc::new(Int64Array::from(vec![Some(1), Some(2), None,
Some(4)])) as ArrayRef;
+ let col3 = Arc::new(arrow_array::Float32Array::from(vec![
+ Some(0.5),
+ Some(2.0),
+ None,
+ Some(3.5),
+ ])) as ArrayRef;
+ let col4 = Arc::new(arrow_array::Float64Array::from(vec![
+ Some(0.5),
+ Some(2.0),
+ None,
+ Some(3.5),
+ ])) as ArrayRef;
+ let col5 = Arc::new(arrow_array::StringArray::from(vec![
+ Some("a"),
+ Some("b"),
+ None,
+ Some("d"),
+ ])) as ArrayRef;
+ let col6 = Arc::new(arrow_array::LargeBinaryArray::from_opt_vec(vec![
+ Some(b"one"),
+ None,
+ Some(b""),
+ Some(b"zzzz"),
+ ])) as ArrayRef;
+ let col7 = Arc::new(arrow_array::Date32Array::from(vec![
+ Some(0),
+ Some(1),
+ None,
+ Some(3),
+ ])) as ArrayRef;
+ let col8 = Arc::new(arrow_array::Time64MicrosecondArray::from(vec![
+ Some(0),
+ Some(1),
+ None,
+ Some(3),
+ ])) as ArrayRef;
+ let col9 = Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
+ Some(0),
+ Some(1),
+ None,
+ Some(3),
+ ])) as ArrayRef;
+ let col10 = Arc::new(
+ arrow_array::TimestampMicrosecondArray::from(vec![Some(0),
Some(1), None, Some(3)])
+ .with_timezone_utc(),
+ ) as ArrayRef;
+ let col11 = Arc::new(
+ arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None,
Some(100)])
+ .with_precision_and_scale(10, 5)
+ .unwrap(),
+ ) as ArrayRef;
+ let col12 = Arc::new(
+ arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+ vec![
+ Some(Uuid::from_u128(0).as_bytes().to_vec()),
+ Some(Uuid::from_u128(1).as_bytes().to_vec()),
+ None,
+ Some(Uuid::from_u128(3).as_bytes().to_vec()),
+ ]
+ .into_iter(),
+ 16,
+ )
+ .unwrap(),
+ ) as ArrayRef;
+ let col13 = Arc::new(
+ arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+ vec![
+ Some(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
+ Some(vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]),
+ None,
+ Some(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]),
+ ]
+ .into_iter(),
+ 10,
+ )
+ .unwrap(),
+ ) as ArrayRef;
+ let to_write = RecordBatch::try_new(
+ arrow_schema.clone(),
+ vec![
+ col0, col1, col2, col3, col4, col5, col6, col7, col8, col9,
col10, col11, col12,
+ col13,
+ ],
+ )
+ .unwrap();
+
+ // write data
+ let mut pw = ParquetWriterBuilder::new(
+ WriterProperties::builder().build(),
+ Arc::new(schema),
+ file_io.clone(),
+ loccation_gen,
+ file_name_gen,
+ )
+ .build()
+ .await?;
+ pw.write(&to_write).await?;
+ let res = pw.close().await?;
+ assert_eq!(res.len(), 1);
+ let data_file = res
+ .into_iter()
+ .next()
+ .unwrap()
+ // Put dummy field for build successfully.
+ .content(crate::spec::DataContentType::Data)
+ .partition(Struct::empty())
+ .build()
+ .unwrap();
+
+ // check data file
+ assert_eq!(data_file.record_count(), 4);
+ assert!(data_file.value_counts().iter().all(|(_, &v)| { v == 4 }));
+ assert!(data_file
+ .null_value_counts()
+ .iter()
+ .all(|(_, &v)| { v == 1 }));
+ assert_eq!(
+ *data_file.lower_bounds(),
+ HashMap::from([
+ (0, Datum::bool(false)),
+ (1, Datum::int(1)),
+ (2, Datum::long(1)),
+ (3, Datum::float(0.5)),
+ (4, Datum::double(0.5)),
+ (5, Datum::string("a")),
+ (6, Datum::binary(vec![])),
+ (7, Datum::date(0)),
+ (8, Datum::time_micros(0).unwrap()),
+ (9, Datum::timestamp_micros(0)),
+ (10, Datum::timestamptz_micros(0)),
+ (
+ 11,
+ Datum::new(
+ PrimitiveType::Decimal {
+ precision: 10,
+ scale: 5
+ },
+ PrimitiveLiteral::Decimal(1)
+ )
+ ),
+ (12, Datum::uuid(Uuid::from_u128(0))),
+ (13, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
+ (12, Datum::uuid(Uuid::from_u128(0))),
+ (13, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
+ ])
+ );
+ assert_eq!(
+ *data_file.upper_bounds(),
+ HashMap::from([
+ (0, Datum::bool(true)),
+ (1, Datum::int(4)),
+ (2, Datum::long(4)),
+ (3, Datum::float(3.5)),
+ (4, Datum::double(3.5)),
+ (5, Datum::string("d")),
+ (6, Datum::binary(vec![122, 122, 122, 122])),
+ (7, Datum::date(3)),
+ (8, Datum::time_micros(3).unwrap()),
+ (9, Datum::timestamp_micros(3)),
+ (10, Datum::timestamptz_micros(3)),
+ (
+ 11,
+ Datum::new(
+ PrimitiveType::Decimal {
+ precision: 10,
+ scale: 5
+ },
+ PrimitiveLiteral::Decimal(100)
+ )
+ ),
+ (12, Datum::uuid(Uuid::from_u128(3))),
+ (
+ 13,
+ Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30])
+ ),
+ ])
+ );
+
// check the written file
check_parquet_data_file(&file_io, &data_file, &to_write).await;
diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs
index 5f3ae55..06b763d 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/mod.rs
@@ -127,62 +127,11 @@ mod tests {
let input_content = input_file.read().await.unwrap();
let reader_builder =
ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
- let metadata = reader_builder.metadata().clone();
// check data
let reader = reader_builder.build().unwrap();
let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
let res = concat_batches(&batch.schema(), &batches).unwrap();
assert_eq!(*batch, res);
-
- // check metadata
- let expect_column_num = batch.num_columns();
-
- assert_eq!(
- data_file.record_count,
- metadata
- .row_groups()
- .iter()
- .map(|group| group.num_rows())
- .sum::<i64>() as u64
- );
-
- assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
-
- assert_eq!(data_file.column_sizes.len(), expect_column_num);
- data_file.column_sizes.iter().for_each(|(&k, &v)| {
- let expect = metadata
- .row_groups()
- .iter()
- .map(|group| group.column(k as usize).compressed_size())
- .sum::<i64>() as u64;
- assert_eq!(v, expect);
- });
-
- assert_eq!(data_file.value_counts.len(), expect_column_num);
- data_file.value_counts.iter().for_each(|(_, &v)| {
- let expect = metadata
- .row_groups()
- .iter()
- .map(|group| group.num_rows())
- .sum::<i64>() as u64;
- assert_eq!(v, expect);
- });
-
- assert_eq!(data_file.null_value_counts.len(), expect_column_num);
- data_file.null_value_counts.iter().for_each(|(&k, &v)| {
- let expect = batch.column(k as usize).null_count() as u64;
- assert_eq!(v, expect);
- });
-
- assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups());
- data_file
- .split_offsets
- .iter()
- .enumerate()
- .for_each(|(i, &v)| {
- let expect = metadata.row_groups()[i].file_offset().unwrap();
- assert_eq!(v, expect);
- });
}
}