This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3cc1a685 feat: Add conversion from `FileMetaData` to `ParquetMetadata` 
(#1074)
3cc1a685 is described below

commit 3cc1a685743142ddea04c81ebc4be0ec6a34168a
Author: Jonathan Chen <[email protected]>
AuthorDate: Thu Mar 13 22:38:53 2025 -0400

    feat: Add conversion from `FileMetaData` to `ParquetMetadata` (#1074)
    
    ## Which issue does this PR close?
    
    - Closes #1033 and #1004.
    
    ## What changes are included in this PR?
    
    Add conversion from filemetadat to parquet metadata using thrift
    `decode_metadata`
    
    <!--
    Provide a summary of the modifications in this PR. List the main changes
    such as new features, bug fixes, refactoring, or any other updates.
    -->
    
    ## Are these changes tested?
    
    <!--
    Specify what test covers (unit test, integration test, etc.).
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
---
 Cargo.lock                                         |  30 ++++-
 Cargo.toml                                         |   1 +
 crates/iceberg/Cargo.toml                          |   1 +
 .../src/writer/file_writer/parquet_writer.rs       | 127 +++++++--------------
 4 files changed, 70 insertions(+), 89 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index cf4a23df..69f07c52 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2676,6 +2676,12 @@ version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
 
+[[package]]
+name = "hermit-abi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
+
 [[package]]
 name = "hermit-abi"
 version = "0.4.0"
@@ -2980,6 +2986,7 @@ dependencies = [
  "serde_with",
  "tempfile",
  "tera",
+ "thrift",
  "tokio",
  "typed-builder 0.20.0",
  "url",
@@ -3947,6 +3954,16 @@ dependencies = [
  "libm",
 ]
 
+[[package]]
+name = "num_cpus"
+version = "1.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
+dependencies = [
+ "hermit-abi 0.3.9",
+ "libc",
+]
+
 [[package]]
 name = "num_enum"
 version = "0.7.3"
@@ -4408,7 +4425,7 @@ checksum = 
"a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f"
 dependencies = [
  "cfg-if",
  "concurrent-queue",
- "hermit-abi",
+ "hermit-abi 0.4.0",
  "pin-project-lite",
  "rustix 0.38.44",
  "tracing",
@@ -5972,6 +5989,15 @@ dependencies = [
  "once_cell",
 ]
 
+[[package]]
+name = "threadpool"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
+dependencies = [
+ "num_cpus",
+]
+
 [[package]]
 name = "thrift"
 version = "0.17.0"
@@ -5980,7 +6006,9 @@ checksum = 
"7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09"
 dependencies = [
  "byteorder",
  "integer-encoding 3.0.4",
+ "log",
  "ordered-float 2.10.1",
+ "threadpool",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index adfbda16..850946c3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,6 +94,7 @@ serde_json = "1.0.138"
 serde_repr = "0.1.16"
 serde_with = "3.4"
 tempfile = "3.18"
+thrift = "0.17.0"  
 tokio = { version = "1.36", default-features = false }
 typed-builder = "0.20"
 url = "2.5.4"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 7320c455..bb4c26ab 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -79,6 +79,7 @@ serde_derive = { workspace = true }
 serde_json = { workspace = true }
 serde_repr = { workspace = true }
 serde_with = { workspace = true }
+thrift = { workspace = true }
 tokio = { workspace = true, optional = true, features = ["sync"] }
 typed-builder = { workspace = true }
 url = { workspace = true }
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs 
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index bed9cc3d..333314a0 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -28,10 +28,12 @@ use itertools::Itertools;
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
 use parquet::arrow::AsyncArrowWriter;
-use parquet::file::metadata::ParquetMetaData;
+use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use parquet::file::properties::WriterProperties;
-use parquet::file::statistics::{from_thrift, Statistics};
+use parquet::file::statistics::Statistics;
 use parquet::format::FileMetaData;
+use parquet::thrift::{TCompactOutputProtocol, TSerializable};
+use thrift::protocol::TOutputProtocol;
 
 use super::location_generator::{FileNameGenerator, LocationGenerator};
 use super::track_writer::TrackWriter;
@@ -352,89 +354,27 @@ impl ParquetWriter {
         Ok(data_files)
     }
 
-    fn to_data_file_builder(
-        schema: SchemaRef,
-        metadata: FileMetaData,
-        written_size: usize,
-        file_path: String,
-    ) -> Result<DataFileBuilder> {
-        let index_by_parquet_path = {
-            let mut visitor = IndexByParquetPathName::new();
-            visit_schema(&schema, &mut visitor)?;
-            visitor
-        };
-
-        let (column_sizes, value_counts, null_value_counts, (lower_bounds, 
upper_bounds)) = {
-            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
-            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
-            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
-            let mut min_max_agg = MinMaxColAggregator::new(schema);
-
-            for row_group in &metadata.row_groups {
-                for column_chunk in row_group.columns.iter() {
-                    let Some(column_chunk_metadata) = &column_chunk.meta_data 
else {
-                        continue;
-                    };
-                    let physical_type = column_chunk_metadata.type_;
-                    let Some(&field_id) =
-                        
index_by_parquet_path.get(&column_chunk_metadata.path_in_schema.join("."))
-                    else {
-                        // Following java implementation: 
https://github.com/apache/iceberg/blob/29a2c456353a6120b8c882ed2ab544975b168d7b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L163
-                        // Ignore the field if it is not in schema.
-                        continue;
-                    };
-                    *per_col_size.entry(field_id).or_insert(0) +=
-                        column_chunk_metadata.total_compressed_size as u64;
-                    *per_col_val_num.entry(field_id).or_insert(0) +=
-                        column_chunk_metadata.num_values as u64;
-                    if let Some(null_count) = column_chunk_metadata
-                        .statistics
-                        .as_ref()
-                        .and_then(|s| s.null_count)
-                    {
-                        *per_col_null_val_num.entry(field_id).or_insert(0_u64) 
+= null_count as u64;
-                    }
-                    if let Some(statistics) = 
&column_chunk_metadata.statistics {
-                        min_max_agg.update(
-                            field_id,
-                            from_thrift(physical_type.try_into()?, 
Some(statistics.clone()))?
-                                .unwrap(),
-                        )?;
-                    }
-                }
-            }
+    fn thrift_to_parquet_metadata(&self, file_metadata: FileMetaData) -> 
Result<ParquetMetaData> {
+        let mut buffer = Vec::new();
+        {
+            let mut protocol = TCompactOutputProtocol::new(&mut buffer);
+            file_metadata
+                .write_to_out_protocol(&mut protocol)
+                .map_err(|err| {
+                    Error::new(ErrorKind::Unexpected, "Failed to write parquet 
metadata")
+                        .with_source(err)
+                })?;
+
+            protocol.flush().map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "Failed to flush 
protocol").with_source(err)
+            })?;
+        }
 
-            (
-                per_col_size,
-                per_col_val_num,
-                per_col_null_val_num,
-                min_max_agg.produce(),
-            )
-        };
+        let parquet_metadata = 
ParquetMetaDataReader::decode_metadata(&buffer).map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "Failed to decode parquet 
metadata").with_source(err)
+        })?;
 
-        let mut builder = DataFileBuilder::default();
-        builder
-            .file_path(file_path)
-            .file_format(DataFileFormat::Parquet)
-            .record_count(metadata.num_rows as u64)
-            .file_size_in_bytes(written_size as u64)
-            .column_sizes(column_sizes)
-            .value_counts(value_counts)
-            .null_value_counts(null_value_counts)
-            .lower_bounds(lower_bounds)
-            .upper_bounds(upper_bounds)
-            // # TODO(#417)
-            // - nan_value_counts
-            // - distinct_counts
-            .key_metadata(metadata.footer_signing_key_metadata)
-            .split_offsets(
-                metadata
-                    .row_groups
-                    .iter()
-                    .filter_map(|group| group.file_offset)
-                    .collect(),
-            );
-        Ok(builder)
+        Ok(parquet_metadata)
     }
 
     /// `ParquetMetadata` to data file builder
@@ -551,19 +491,30 @@ impl FileWriter for ParquetWriter {
         Ok(())
     }
 
-    async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
-        let Some(writer) = self.inner_writer else {
-            return Ok(vec![]);
+    async fn close(mut self) -> 
crate::Result<Vec<crate::spec::DataFileBuilder>> {
+        let writer = match self.inner_writer.take() {
+            Some(writer) => writer,
+            None => return Ok(vec![]),
         };
+
         let metadata = writer.close().await.map_err(|err| {
             Error::new(ErrorKind::Unexpected, "Failed to close parquet 
writer.").with_source(err)
         })?;
 
         let written_size = 
self.written_size.load(std::sync::atomic::Ordering::Relaxed);
 
-        Ok(vec![Self::to_data_file_builder(
+        let parquet_metadata =
+            Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    "Failed to convert metadata from thrift to parquet.",
+                )
+                .with_source(err)
+            })?);
+
+        Ok(vec![Self::parquet_to_data_file_builder(
             self.schema,
-            metadata,
+            parquet_metadata,
             written_size as usize,
             self.out_file.location().to_string(),
         )?])

Reply via email to