This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3cc1a685 feat: Add conversion from `FileMetaData` to `ParquetMetadata`
(#1074)
3cc1a685 is described below
commit 3cc1a685743142ddea04c81ebc4be0ec6a34168a
Author: Jonathan Chen <[email protected]>
AuthorDate: Thu Mar 13 22:38:53 2025 -0400
feat: Add conversion from `FileMetaData` to `ParquetMetadata` (#1074)
## Which issue does this PR close?
- Closes #1033 and #1004.
## What changes are included in this PR?
Add conversion from filemetadat to parquet metadata using thrift
`decode_metadata`
<!--
Provide a summary of the modifications in this PR. List the main changes
such as new features, bug fixes, refactoring, or any other updates.
-->
## Are these changes tested?
<!--
Specify what test covers (unit test, integration test, etc.).
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
---
Cargo.lock | 30 ++++-
Cargo.toml | 1 +
crates/iceberg/Cargo.toml | 1 +
.../src/writer/file_writer/parquet_writer.rs | 127 +++++++--------------
4 files changed, 70 insertions(+), 89 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index cf4a23df..69f07c52 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2676,6 +2676,12 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
+[[package]]
+name = "hermit-abi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
+
[[package]]
name = "hermit-abi"
version = "0.4.0"
@@ -2980,6 +2986,7 @@ dependencies = [
"serde_with",
"tempfile",
"tera",
+ "thrift",
"tokio",
"typed-builder 0.20.0",
"url",
@@ -3947,6 +3954,16 @@ dependencies = [
"libm",
]
+[[package]]
+name = "num_cpus"
+version = "1.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
+dependencies = [
+ "hermit-abi 0.3.9",
+ "libc",
+]
+
[[package]]
name = "num_enum"
version = "0.7.3"
@@ -4408,7 +4425,7 @@ checksum =
"a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f"
dependencies = [
"cfg-if",
"concurrent-queue",
- "hermit-abi",
+ "hermit-abi 0.4.0",
"pin-project-lite",
"rustix 0.38.44",
"tracing",
@@ -5972,6 +5989,15 @@ dependencies = [
"once_cell",
]
+[[package]]
+name = "threadpool"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
+dependencies = [
+ "num_cpus",
+]
+
[[package]]
name = "thrift"
version = "0.17.0"
@@ -5980,7 +6006,9 @@ checksum =
"7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09"
dependencies = [
"byteorder",
"integer-encoding 3.0.4",
+ "log",
"ordered-float 2.10.1",
+ "threadpool",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index adfbda16..850946c3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,6 +94,7 @@ serde_json = "1.0.138"
serde_repr = "0.1.16"
serde_with = "3.4"
tempfile = "3.18"
+thrift = "0.17.0"
tokio = { version = "1.36", default-features = false }
typed-builder = "0.20"
url = "2.5.4"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 7320c455..bb4c26ab 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -79,6 +79,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
+thrift = { workspace = true }
tokio = { workspace = true, optional = true, features = ["sync"] }
typed-builder = { workspace = true }
url = { workspace = true }
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index bed9cc3d..333314a0 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -28,10 +28,12 @@ use itertools::Itertools;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
use parquet::arrow::AsyncArrowWriter;
-use parquet::file::metadata::ParquetMetaData;
+use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::file::properties::WriterProperties;
-use parquet::file::statistics::{from_thrift, Statistics};
+use parquet::file::statistics::Statistics;
use parquet::format::FileMetaData;
+use parquet::thrift::{TCompactOutputProtocol, TSerializable};
+use thrift::protocol::TOutputProtocol;
use super::location_generator::{FileNameGenerator, LocationGenerator};
use super::track_writer::TrackWriter;
@@ -352,89 +354,27 @@ impl ParquetWriter {
Ok(data_files)
}
- fn to_data_file_builder(
- schema: SchemaRef,
- metadata: FileMetaData,
- written_size: usize,
- file_path: String,
- ) -> Result<DataFileBuilder> {
- let index_by_parquet_path = {
- let mut visitor = IndexByParquetPathName::new();
- visit_schema(&schema, &mut visitor)?;
- visitor
- };
-
- let (column_sizes, value_counts, null_value_counts, (lower_bounds,
upper_bounds)) = {
- let mut per_col_size: HashMap<i32, u64> = HashMap::new();
- let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
- let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
- let mut min_max_agg = MinMaxColAggregator::new(schema);
-
- for row_group in &metadata.row_groups {
- for column_chunk in row_group.columns.iter() {
- let Some(column_chunk_metadata) = &column_chunk.meta_data
else {
- continue;
- };
- let physical_type = column_chunk_metadata.type_;
- let Some(&field_id) =
-
index_by_parquet_path.get(&column_chunk_metadata.path_in_schema.join("."))
- else {
- // Following java implementation:
https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
- // Ignore the field if it is not in schema.
- continue;
- };
- *per_col_size.entry(field_id).or_insert(0) +=
- column_chunk_metadata.total_compressed_size as u64;
- *per_col_val_num.entry(field_id).or_insert(0) +=
- column_chunk_metadata.num_values as u64;
- if let Some(null_count) = column_chunk_metadata
- .statistics
- .as_ref()
- .and_then(|s| s.null_count)
- {
- *per_col_null_val_num.entry(field_id).or_insert(0_u64)
+= null_count as u64;
- }
- if let Some(statistics) =
&column_chunk_metadata.statistics {
- min_max_agg.update(
- field_id,
- from_thrift(physical_type.try_into()?,
Some(statistics.clone()))?
- .unwrap(),
- )?;
- }
- }
- }
+ fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) ->
Result<ParquetMetaData> {
+ let mut buffer = Vec::new();
+ {
+ let mut protocol = TCompactOutputProtocol::new(&mut buffer);
+ file_metadata
+ .write_to_out_protocol(&mut protocol)
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "Failed to write parquet
metadata")
+ .with_source(err)
+ })?;
+
+ protocol.flush().map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "Failed to flush
protocol").with_source(err)
+ })?;
+ }
- (
- per_col_size,
- per_col_val_num,
- per_col_null_val_num,
- min_max_agg.produce(),
- )
- };
+ let parquet_metadata =
ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "Failed to decode parquet
metadata").with_source(err)
+ })?;
- let mut builder = DataFileBuilder::default();
- builder
- .file_path(file_path)
- .file_format(DataFileFormat::Parquet)
- .record_count(metadata.num_rows as u64)
- .file_size_in_bytes(written_size as u64)
- .column_sizes(column_sizes)
- .value_counts(value_counts)
- .null_value_counts(null_value_counts)
- .lower_bounds(lower_bounds)
- .upper_bounds(upper_bounds)
- // # TODO(#417)
- // - nan_value_counts
- // - distinct_counts
- .key_metadata(metadata.footer_signing_key_metadata)
- .split_offsets(
- metadata
- .row_groups
- .iter()
- .filter_map(|group| group.file_offset)
- .collect(),
- );
- Ok(builder)
+ Ok(parquet_metadata)
}
/// `ParquetMetadata` to data file builder
@@ -551,19 +491,30 @@ impl FileWriter for ParquetWriter {
Ok(())
}
- async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
- let Some(writer) = self.inner_writer else {
- return Ok(vec![]);
+ async fn close(mut self) ->
crate::Result<Vec<crate::spec::DataFileBuilder>> {
+ let writer = match self.inner_writer.take() {
+ Some(writer) => writer,
+ None => return Ok(vec![]),
};
+
let metadata = writer.close().await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "Failed to close parquet
writer.").with_source(err)
})?;
let written_size =
self.written_size.load(std::sync::atomic::Ordering::Relaxed);
- Ok(vec![Self::to_data_file_builder(
+ let parquet_metadata =
+ Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to convert metadata from thrift to parquet.",
+ )
+ .with_source(err)
+ })?);
+
+ Ok(vec![Self::parquet_to_data_file_builder(
self.schema,
- metadata,
+ parquet_metadata,
written_size as usize,
self.out_file.location().to_string(),
)?])