This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new fcc88920 feat: Add summary functionality to `SnapshotProduceAction` 
(#1139)
fcc88920 is described below

commit fcc88920f52dbae53257757e2d33825bea4b51a9
Author: Jonathan Chen <[email protected]>
AuthorDate: Wed Apr 2 19:30:15 2025 -0400

    feat: Add summary functionality to `SnapshotProduceAction` (#1139)
    
    ## Which issue does this PR close?
    
    
    - Closes #724 .
    
    ## What changes are included in this PR?
    Added summary functionality to snapshot produce action
---
 crates/iceberg/src/spec/mod.rs              |  1 +
 crates/iceberg/src/spec/snapshot_summary.rs | 17 ++++---
 crates/iceberg/src/spec/table_metadata.rs   |  5 +++
 crates/iceberg/src/transaction/snapshot.rs  | 70 ++++++++++++++++++++++++-----
 4 files changed, 78 insertions(+), 15 deletions(-)

diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index a7fe9a39..5cf2afa1 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -41,6 +41,7 @@ pub use manifest_list::*;
 pub use partition::*;
 pub use schema::*;
 pub use snapshot::*;
+pub use snapshot_summary::*;
 pub use sort::*;
 pub use statistic_file::*;
 pub use table_metadata::*;
diff --git a/crates/iceberg/src/spec/snapshot_summary.rs 
b/crates/iceberg/src/spec/snapshot_summary.rs
index fae04863..05f9fb8e 100644
--- a/crates/iceberg/src/spec/snapshot_summary.rs
+++ b/crates/iceberg/src/spec/snapshot_summary.rs
@@ -48,8 +48,10 @@ const TOTAL_FILE_SIZE: &str = "total-files-size";
 const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
 const CHANGED_PARTITION_PREFIX: &str = "partitions.";
 
+/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics.
+/// It gathers metrics about added or removed data files and manifests, and 
tracks
+/// partition-specific updates.
 #[derive(Default)]
-#[allow(dead_code)]
 pub struct SnapshotSummaryCollector {
     metrics: UpdateMetrics,
     partition_metrics: HashMap<String, UpdateMetrics>,
@@ -58,17 +60,19 @@ pub struct SnapshotSummaryCollector {
     trust_partition_metrics: bool,
 }
 
-#[allow(dead_code)]
 impl SnapshotSummaryCollector {
-    // Set properties
+    /// Set properties for snapshot summary
     pub fn set(&mut self, key: &str, value: &str) {
         self.properties.insert(key.to_string(), value.to_string());
     }
 
+    /// Sets the limit for including partition summaries. Summaries are not
+    /// included if the number of partitions is exceeded.
     pub fn set_partition_summary_limit(&mut self, limit: u64) {
         self.max_changed_partitions_for_summaries = limit;
     }
 
+    /// Adds a data file to the summary collector
     pub fn add_file(
         &mut self,
         data_file: &DataFile,
@@ -81,6 +85,7 @@ impl SnapshotSummaryCollector {
         }
     }
 
+    /// Removes a data file from the summary collector
     pub fn remove_file(
         &mut self,
         data_file: &DataFile,
@@ -93,12 +98,14 @@ impl SnapshotSummaryCollector {
         }
     }
 
+    /// Adds a manifest to the summary collector
     pub fn add_manifest(&mut self, manifest: &ManifestFile) {
         self.trust_partition_metrics = false;
         self.partition_metrics.clear();
         self.metrics.add_manifest(manifest);
     }
 
+    /// Updates partition-specific metrics for a data file.
     pub fn update_partition_metrics(
         &mut self,
         schema: SchemaRef,
@@ -116,6 +123,7 @@ impl SnapshotSummaryCollector {
         }
     }
 
+    /// Merges another `SnapshotSummaryCollector` into the current one
     pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
         self.metrics.merge(&summary.metrics);
         self.properties.extend(summary.properties);
@@ -133,6 +141,7 @@ impl SnapshotSummaryCollector {
         }
     }
 
+    /// Builds final map of summaries
     pub fn build(&self) -> HashMap<String, String> {
         let mut properties = self.metrics.to_map();
         let changed_partitions_count = self.partition_metrics.len() as u64;
@@ -507,8 +516,6 @@ fn update_totals(
         .insert(total_property.to_string(), new_total.to_string());
 }
 
-// TODO: ancestors of function
-
 #[cfg(test)]
 mod tests {
     use std::collections::HashMap;
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index e9e99605..59d69e0f 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -77,6 +77,11 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = 
"write.metadata.previo
 /// Default value for max number of previous versions to keep.
 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
 
+/// Property key for max number of partitions to keep summary stats for.
+pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = 
"write.summary.partition-limit";
+/// Default value for the max number of partitions to keep summary stats for.
+pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
+
 /// Reserved Iceberg table properties list.
 ///
 /// Reserved table properties are only used to control behaviors when creating 
or updating a
diff --git a/crates/iceberg/src/transaction/snapshot.rs 
b/crates/iceberg/src/transaction/snapshot.rs
index e1a2fa0d..f266491d 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -24,9 +24,11 @@ use uuid::Uuid;
 use crate::error::Result;
 use crate::io::OutputFile;
 use crate::spec::{
-    DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, 
ManifestListWriter,
-    ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, 
SnapshotRetention, Struct,
-    StructType, Summary, MAIN_BRANCH,
+    update_snapshot_summaries, DataFile, DataFileFormat, FormatVersion, 
ManifestEntry,
+    ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, 
Snapshot,
+    SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, 
StructType, Summary,
+    MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
+    PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
 };
 use crate::transaction::Transaction;
 use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
@@ -221,13 +223,55 @@ impl<'a> SnapshotProduceAction<'a> {
         Ok(manifest_files)
     }
 
-    // # TODO
-    // Fulfill this function
-    fn summary<OP: SnapshotProduceOperation>(&self, 
snapshot_produce_operation: &OP) -> Summary {
-        Summary {
-            operation: snapshot_produce_operation.operation(),
-            additional_properties: self.snapshot_properties.clone(),
+    // Returns a `Summary` of the current snapshot
+    fn summary<OP: SnapshotProduceOperation>(
+        &self,
+        snapshot_produce_operation: &OP,
+    ) -> Result<Summary> {
+        let mut summary_collector = SnapshotSummaryCollector::default();
+        let table_metadata = self.tx.table.metadata_ref();
+
+        let partition_summary_limit = if let Some(limit) = table_metadata
+            .properties()
+            .get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
+        {
+            if let Ok(limit) = limit.parse::<u64>() {
+                limit
+            } else {
+                PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
+            }
+        } else {
+            PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
+        };
+
+        summary_collector.set_partition_summary_limit(partition_summary_limit);
+
+        for data_file in &self.added_data_files {
+            summary_collector.add_file(
+                data_file,
+                table_metadata.current_schema().clone(),
+                table_metadata.default_partition_spec().clone(),
+            );
         }
+
+        let previous_snapshot = table_metadata
+            .snapshot_by_id(self.snapshot_id)
+            .and_then(|snapshot| snapshot.parent_snapshot_id())
+            .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id));
+
+        let mut additional_properties = summary_collector.build();
+        additional_properties.extend(self.snapshot_properties.clone());
+
+        let summary = Summary {
+            operation: snapshot_produce_operation.operation(),
+            additional_properties,
+        };
+
+        update_snapshot_summaries(
+            summary,
+            previous_snapshot.map(|s| s.summary()),
+            snapshot_produce_operation.operation() == Operation::Overwrite,
+        )
     }
 
     fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
@@ -253,7 +297,13 @@ impl<'a> SnapshotProduceAction<'a> {
             .await?;
         let next_seq_num = self.tx.table.metadata().next_sequence_number();
 
-        let summary = self.summary(&snapshot_produce_operation);
+        let summary = self
+            .summary(&snapshot_produce_operation)
+            .map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "Failed to create snapshot 
summary.")
+                    .with_source(err)
+            })
+            .unwrap();
 
         let manifest_list_path = self.generate_manifest_list_file_path(0);
 

Reply via email to