This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fcc88920 feat: Add summary functionality to `SnapshotProduceAction`
(#1139)
fcc88920 is described below
commit fcc88920f52dbae53257757e2d33825bea4b51a9
Author: Jonathan Chen <[email protected]>
AuthorDate: Wed Apr 2 19:30:15 2025 -0400
feat: Add summary functionality to `SnapshotProduceAction` (#1139)
## Which issue does this PR close?
- Closes #724 .
## What changes are included in this PR?
Added summary functionality to snapshot produce action
---
crates/iceberg/src/spec/mod.rs | 1 +
crates/iceberg/src/spec/snapshot_summary.rs | 17 ++++---
crates/iceberg/src/spec/table_metadata.rs | 5 +++
crates/iceberg/src/transaction/snapshot.rs | 70 ++++++++++++++++++++++++-----
4 files changed, 78 insertions(+), 15 deletions(-)
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index a7fe9a39..5cf2afa1 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -41,6 +41,7 @@ pub use manifest_list::*;
pub use partition::*;
pub use schema::*;
pub use snapshot::*;
+pub use snapshot_summary::*;
pub use sort::*;
pub use statistic_file::*;
pub use table_metadata::*;
diff --git a/crates/iceberg/src/spec/snapshot_summary.rs
b/crates/iceberg/src/spec/snapshot_summary.rs
index fae04863..05f9fb8e 100644
--- a/crates/iceberg/src/spec/snapshot_summary.rs
+++ b/crates/iceberg/src/spec/snapshot_summary.rs
@@ -48,8 +48,10 @@ const TOTAL_FILE_SIZE: &str = "total-files-size";
const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
const CHANGED_PARTITION_PREFIX: &str = "partitions.";
+/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics.
+/// It gathers metrics about added or removed data files and manifests, and
tracks
+/// partition-specific updates.
#[derive(Default)]
-#[allow(dead_code)]
pub struct SnapshotSummaryCollector {
metrics: UpdateMetrics,
partition_metrics: HashMap<String, UpdateMetrics>,
@@ -58,17 +60,19 @@ pub struct SnapshotSummaryCollector {
trust_partition_metrics: bool,
}
-#[allow(dead_code)]
impl SnapshotSummaryCollector {
- // Set properties
+ /// Set properties for snapshot summary
pub fn set(&mut self, key: &str, value: &str) {
self.properties.insert(key.to_string(), value.to_string());
}
+ /// Sets the limit for including partition summaries. Summaries are not
+ /// included if the number of partitions is exceeded.
pub fn set_partition_summary_limit(&mut self, limit: u64) {
self.max_changed_partitions_for_summaries = limit;
}
+ /// Adds a data file to the summary collector
pub fn add_file(
&mut self,
data_file: &DataFile,
@@ -81,6 +85,7 @@ impl SnapshotSummaryCollector {
}
}
+ /// Removes a data file from the summary collector
pub fn remove_file(
&mut self,
data_file: &DataFile,
@@ -93,12 +98,14 @@ impl SnapshotSummaryCollector {
}
}
+ /// Adds a manifest to the summary collector
pub fn add_manifest(&mut self, manifest: &ManifestFile) {
self.trust_partition_metrics = false;
self.partition_metrics.clear();
self.metrics.add_manifest(manifest);
}
+ /// Updates partition-specific metrics for a data file.
pub fn update_partition_metrics(
&mut self,
schema: SchemaRef,
@@ -116,6 +123,7 @@ impl SnapshotSummaryCollector {
}
}
+ /// Merges another `SnapshotSummaryCollector` into the current one
pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
self.metrics.merge(&summary.metrics);
self.properties.extend(summary.properties);
@@ -133,6 +141,7 @@ impl SnapshotSummaryCollector {
}
}
+ /// Builds final map of summaries
pub fn build(&self) -> HashMap<String, String> {
let mut properties = self.metrics.to_map();
let changed_partitions_count = self.partition_metrics.len() as u64;
@@ -507,8 +516,6 @@ fn update_totals(
.insert(total_property.to_string(), new_total.to_string());
}
-// TODO: ancestors of function
-
#[cfg(test)]
mod tests {
use std::collections::HashMap;
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index e9e99605..59d69e0f 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -77,6 +77,11 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
"write.metadata.previo
/// Default value for max number of previous versions to keep.
pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
+/// Property key for max number of partitions to keep summary stats for.
+pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str =
"write.summary.partition-limit";
+/// Default value for the max number of partitions to keep summary stats for.
+pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
+
/// Reserved Iceberg table properties list.
///
/// Reserved table properties are only used to control behaviors when creating
or updating a
diff --git a/crates/iceberg/src/transaction/snapshot.rs
b/crates/iceberg/src/transaction/snapshot.rs
index e1a2fa0d..f266491d 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -24,9 +24,11 @@ use uuid::Uuid;
use crate::error::Result;
use crate::io::OutputFile;
use crate::spec::{
- DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile,
ManifestListWriter,
- ManifestWriterBuilder, Operation, Snapshot, SnapshotReference,
SnapshotRetention, Struct,
- StructType, Summary, MAIN_BRANCH,
+ update_snapshot_summaries, DataFile, DataFileFormat, FormatVersion,
ManifestEntry,
+ ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation,
Snapshot,
+ SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct,
StructType, Summary,
+ MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
+ PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
};
use crate::transaction::Transaction;
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
@@ -221,13 +223,55 @@ impl<'a> SnapshotProduceAction<'a> {
Ok(manifest_files)
}
- // # TODO
- // Fulfill this function
- fn summary<OP: SnapshotProduceOperation>(&self,
snapshot_produce_operation: &OP) -> Summary {
- Summary {
- operation: snapshot_produce_operation.operation(),
- additional_properties: self.snapshot_properties.clone(),
+ // Returns a `Summary` of the current snapshot
+ fn summary<OP: SnapshotProduceOperation>(
+ &self,
+ snapshot_produce_operation: &OP,
+ ) -> Result<Summary> {
+ let mut summary_collector = SnapshotSummaryCollector::default();
+ let table_metadata = self.tx.table.metadata_ref();
+
+ let partition_summary_limit = if let Some(limit) = table_metadata
+ .properties()
+ .get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
+ {
+ if let Ok(limit) = limit.parse::<u64>() {
+ limit
+ } else {
+ PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
+ }
+ } else {
+ PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
+ };
+
+ summary_collector.set_partition_summary_limit(partition_summary_limit);
+
+ for data_file in &self.added_data_files {
+ summary_collector.add_file(
+ data_file,
+ table_metadata.current_schema().clone(),
+ table_metadata.default_partition_spec().clone(),
+ );
}
+
+ let previous_snapshot = table_metadata
+ .snapshot_by_id(self.snapshot_id)
+ .and_then(|snapshot| snapshot.parent_snapshot_id())
+ .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id));
+
+ let mut additional_properties = summary_collector.build();
+ additional_properties.extend(self.snapshot_properties.clone());
+
+ let summary = Summary {
+ operation: snapshot_produce_operation.operation(),
+ additional_properties,
+ };
+
+ update_snapshot_summaries(
+ summary,
+ previous_snapshot.map(|s| s.summary()),
+ snapshot_produce_operation.operation() == Operation::Overwrite,
+ )
}
fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
@@ -253,7 +297,13 @@ impl<'a> SnapshotProduceAction<'a> {
.await?;
let next_seq_num = self.tx.table.metadata().next_sequence_number();
- let summary = self.summary(&snapshot_produce_operation);
+ let summary = self
+ .summary(&snapshot_produce_operation)
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "Failed to create snapshot
summary.")
+ .with_source(err)
+ })
+ .unwrap();
let manifest_list_path = self.generate_manifest_list_file_path(0);