This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new c6cf0e98 fix: TableMetadata `last_updated_ms` not increased for all 
operations (#978)
c6cf0e98 is described below

commit c6cf0e98c49c3c27241a11883f053084927225bf
Author: Christian <[email protected]>
AuthorDate: Thu Feb 20 03:59:19 2025 +0100

    fix: TableMetadata `last_updated_ms` not increased for all operations (#978)
    
    Currently we increase the `last_updated_ms` timestamp only if a snapshot
    was added.
    Java always updates this timestamp, also if i.e. only Properties where
    added..
    
    Trino has a catalog integration test that validates this - which we
    failed due to this.
    
    This PR ensures that `last_updated_ms` is always updated for builds.
---
 crates/iceberg/src/spec/table_metadata_builder.rs | 61 +++++++++++++++++++----
 1 file changed, 51 insertions(+), 10 deletions(-)

diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs 
b/crates/iceberg/src/spec/table_metadata_builder.rs
index cbf2e5e3..28e2f4e8 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -54,6 +54,7 @@ pub struct TableMetadataBuilder {
     last_added_order_id: Option<i64>,
     // None if this is a new table (from_metadata) method not used
     previous_history_entry: Option<MetadataLog>,
+    last_updated_ms: Option<i64>,
 }
 
 #[derive(Debug, Clone, PartialEq)]
@@ -120,6 +121,7 @@ impl TableMetadataBuilder {
                 statistics: HashMap::new(),
                 partition_statistics: HashMap::new(),
             },
+            last_updated_ms: None,
             changes: vec![],
             last_added_schema_id: Some(schema_id),
             last_added_spec_id: None,
@@ -156,6 +158,7 @@ impl TableMetadataBuilder {
             last_added_schema_id: None,
             last_added_spec_id: None,
             last_added_order_id: None,
+            last_updated_ms: None,
         }
     }
 
@@ -368,13 +371,17 @@ impl TableMetadataBuilder {
             }
         }
 
-        if snapshot.timestamp_ms() - self.metadata.last_updated_ms < 
-ONE_MINUTE_MS {
+        let max_last_updated = self
+            .last_updated_ms
+            .unwrap_or_default()
+            .max(self.metadata.last_updated_ms);
+        if snapshot.timestamp_ms() - max_last_updated < -ONE_MINUTE_MS {
             return Err(Error::new(
                 ErrorKind::DataInvalid,
                 format!(
                     "Invalid snapshot timestamp {}: before last updated 
timestamp {}",
                     snapshot.timestamp_ms(),
-                    self.metadata.last_updated_ms
+                    max_last_updated
                 ),
             ));
         }
@@ -384,7 +391,7 @@ impl TableMetadataBuilder {
             snapshot: snapshot.clone(),
         });
 
-        self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        self.last_updated_ms = Some(snapshot.timestamp_ms());
         self.metadata.last_sequence_number = snapshot.sequence_number();
         self.metadata
             .snapshots
@@ -483,19 +490,23 @@ impl TableMetadataBuilder {
             matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if 
snap.snapshot_id() == snapshot.snapshot_id())
         });
         if is_added_snapshot {
-            self.metadata.last_updated_ms = snapshot.timestamp_ms();
+            self.last_updated_ms = Some(snapshot.timestamp_ms());
         }
 
         // Current snapshot id is set only for the main branch
         if ref_name == MAIN_BRANCH {
             self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
-            if self.metadata.last_updated_ms == i64::default() {
-                self.metadata.last_updated_ms = 
chrono::Utc::now().timestamp_millis();
+            let timestamp_ms = if let Some(last_updated_ms) = 
self.last_updated_ms {
+                last_updated_ms
+            } else {
+                let last_updated_ms = chrono::Utc::now().timestamp_millis();
+                self.last_updated_ms = Some(last_updated_ms);
+                last_updated_ms
             };
 
             self.metadata.snapshot_log.push(SnapshotLog {
                 snapshot_id: snapshot.snapshot_id(),
-                timestamp_ms: self.metadata.last_updated_ms,
+                timestamp_ms,
             });
         }
 
@@ -911,9 +922,9 @@ impl TableMetadataBuilder {
 
     /// Build the table metadata.
     pub fn build(mut self) -> Result<TableMetadataBuildResult> {
-        if self.metadata.last_updated_ms == i64::default() {
-            self.metadata.last_updated_ms = 
chrono::Utc::now().timestamp_millis();
-        }
+        self.metadata.last_updated_ms = self
+            .last_updated_ms
+            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis());
 
         // Check compatibility of the current schema to the default partition 
spec and sort order.
         // We use the `get_xxx` methods from the builder to avoid using the 
panicking
@@ -1210,6 +1221,8 @@ impl From<TableMetadataBuildResult> for TableMetadata {
 
 #[cfg(test)]
 mod tests {
+    use std::thread::sleep;
+
     use super::*;
     use crate::spec::{
         BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, 
PrimitiveType, Schema,
@@ -2341,4 +2354,32 @@ mod tests {
         assert_eq!(build_result.metadata.partition_statistics.len(), 0);
         assert_eq!(build_result.changes.len(), 0);
     }
+
+    #[test]
+    fn last_update_increased_for_property_only_update() {
+        let builder = builder_without_changes(FormatVersion::V2);
+
+        let metadata = builder.build().unwrap().metadata;
+        let last_updated_ms = metadata.last_updated_ms;
+        sleep(std::time::Duration::from_millis(2));
+
+        let build_result = metadata
+            .into_builder(Some(
+                
"s3://bucket/test/location/metadata/metadata1.json".to_string(),
+            ))
+            .set_properties(HashMap::from_iter(vec![(
+                "foo".to_string(),
+                "bar".to_string(),
+            )]))
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert!(
+            build_result.metadata.last_updated_ms > last_updated_ms,
+            "{} > {}",
+            build_result.metadata.last_updated_ms,
+            last_updated_ms
+        );
+    }
 }

Reply via email to