This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b736986 feat: Add support for rolling back to timestamp (#2879)
9b736986 is described below

commit 9b736986286fab5d5e43ec0130f36cfef49febeb
Author: geruh <[email protected]>
AuthorDate: Thu Jan 22 09:23:15 2026 -0800

    feat: Add support for rolling back to timestamp (#2879)
    
    # Rationale for this change
    This PR adds the ability to rollback a table to a ancestoral snapshot
    given a timestamp. Some of this work was also done in #758, and is a
    progress pr to be merged after #2871 & #2878. This is standalone from
    the other changes but it makes use of the helpers in the other prs.
    
    Additionally, adding some more tests.
    
    
    ## Are these changes tested?
    
    Yes
    
    ## Are there any user-facing changes?
    
    New API for meta
    
    ---------
    
    Co-authored-by: Chinmay Bhat 
<[email protected]>
---
 pyiceberg/table/snapshots.py                  | 21 ++++++++
 pyiceberg/table/update/snapshot.py            | 21 ++++++++
 tests/integration/test_snapshot_operations.py | 63 ++++++++++++++++++++++
 tests/table/test_snapshots.py                 | 77 +++++++++++++++++++++++++++
 4 files changed, 182 insertions(+)

diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index 4ef1645d..797093f9 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -472,3 +472,24 @@ def ancestors_between(from_snapshot: Snapshot | None, 
to_snapshot: Snapshot, tab
                 break
     else:
         yield from ancestors_of(to_snapshot, table_metadata)
+
+
+def latest_ancestor_before_timestamp(table_metadata: TableMetadata, 
timestamp_ms: int) -> Snapshot | None:
+    """Find the latest ancestor snapshot whose timestamp is before the 
provided timestamp.
+
+    Args:
+        table_metadata: The table metadata for a table
+        timestamp_ms: lookup snapshots strictly before this timestamp
+
+    Returns:
+        The latest ancestor snapshot older than the timestamp, or None if not 
found.
+    """
+    result: Snapshot | None = None
+    result_timestamp: int = 0
+
+    for ancestor in ancestors_of(table_metadata.current_snapshot(), 
table_metadata):
+        if timestamp_ms > ancestor.timestamp_ms > result_timestamp:
+            result = ancestor
+            result_timestamp = ancestor.timestamp_ms
+
+    return result
diff --git a/pyiceberg/table/update/snapshot.py 
b/pyiceberg/table/update/snapshot.py
index 987200bf..b7c863d8 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -65,6 +65,7 @@ from pyiceberg.table.snapshots import (
     SnapshotSummaryCollector,
     Summary,
     ancestors_of,
+    latest_ancestor_before_timestamp,
     update_snapshot_summaries,
 )
 from pyiceberg.table.update import (
@@ -1008,6 +1009,26 @@ class 
ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
 
         return self.set_current_snapshot(snapshot_id=snapshot_id)
 
+    def rollback_to_timestamp(self, timestamp_ms: int) -> ManageSnapshots:
+        """Rollback the table to the latest snapshot before the given 
timestamp.
+
+        Finds the latest ancestor snapshot whose timestamp is before the given 
timestamp and rolls back to it.
+
+        Args:
+            timestamp_ms: Rollback to the latest snapshot before this 
timestamp in milliseconds.
+
+        Returns:
+            This for method chaining
+
+        Raises:
+            ValueError: If no valid snapshot exists older than the given 
timestamp.
+        """
+        snapshot = 
latest_ancestor_before_timestamp(self._transaction.table_metadata, timestamp_ms)
+        if snapshot is None:
+            raise ValueError(f"Cannot roll back, no valid snapshot older than: 
{timestamp_ms}")
+
+        return self.set_current_snapshot(snapshot_id=snapshot.snapshot_id)
+
     def _is_current_ancestor(self, snapshot_id: int) -> bool:
         return snapshot_id in self._current_ancestors()
 
diff --git a/tests/integration/test_snapshot_operations.py 
b/tests/integration/test_snapshot_operations.py
index 8755e95f..6fd3aada 100644
--- a/tests/integration/test_snapshot_operations.py
+++ b/tests/integration/test_snapshot_operations.py
@@ -268,3 +268,66 @@ def 
test_rollback_to_snapshot_unknown_id(table_with_snapshots: Table) -> None:
 
     with pytest.raises(ValueError, match="Cannot roll back to unknown snapshot 
id"):
         
table_with_snapshots.manage_snapshots().rollback_to_snapshot(snapshot_id=invalid_snapshot_id).commit()
+
+
[email protected]
+def test_rollback_to_timestamp_no_valid_snapshot(table_with_snapshots: Table) 
-> None:
+    history = table_with_snapshots.history()
+    assert len(history) >= 1
+
+    oldest_timestamp = history[0].timestamp_ms
+
+    with pytest.raises(ValueError, match="Cannot roll back, no valid snapshot 
older than"):
+        
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=oldest_timestamp).commit()
+
+
[email protected]
+def test_rollback_to_timestamp(table_with_snapshots: Table) -> None:
+    current_snapshot = table_with_snapshots.current_snapshot()
+    assert current_snapshot is not None
+    assert current_snapshot.parent_snapshot_id is not None
+
+    parent_snapshot_id = current_snapshot.parent_snapshot_id
+
+    
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms).commit()
+
+    updated_snapshot = table_with_snapshots.current_snapshot()
+    assert updated_snapshot is not None
+    assert updated_snapshot.snapshot_id == parent_snapshot_id
+
+
[email protected]
+def test_rollback_to_timestamp_current_snapshot(table_with_snapshots: Table) 
-> None:
+    current_snapshot = table_with_snapshots.current_snapshot()
+    assert current_snapshot is not None
+
+    timestamp_after_current = current_snapshot.timestamp_ms + 100
+    
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=timestamp_after_current).commit()
+
+    updated_snapshot = table_with_snapshots.current_snapshot()
+    assert updated_snapshot is not None
+    assert updated_snapshot.snapshot_id == current_snapshot.snapshot_id
+
+
[email protected]
+def test_rollback_to_timestamp_chained_with_tag(table_with_snapshots: Table) 
-> None:
+    current_snapshot = table_with_snapshots.current_snapshot()
+    assert current_snapshot is not None
+    assert current_snapshot.parent_snapshot_id is not None
+
+    parent_snapshot_id = current_snapshot.parent_snapshot_id
+    tag_name = "my-tag"
+
+    (
+        table_with_snapshots.manage_snapshots()
+        .create_tag(snapshot_id=current_snapshot.snapshot_id, 
tag_name=tag_name)
+        .rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms)
+        .commit()
+    )
+
+    updated_snapshot = table_with_snapshots.current_snapshot()
+    assert updated_snapshot is not None
+    assert updated_snapshot.snapshot_id == parent_snapshot_id
+    assert table_with_snapshots.metadata.refs[tag_name] == SnapshotRef(
+        snapshot_id=current_snapshot.snapshot_id, snapshot_ref_type="tag"
+    )
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index d26562ad..4aa9521b 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -30,6 +30,7 @@ from pyiceberg.table.snapshots import (
     Summary,
     ancestors_between,
     ancestors_of,
+    latest_ancestor_before_timestamp,
     update_snapshot_summaries,
 )
 from pyiceberg.transforms import IdentityTransform
@@ -456,3 +457,79 @@ def 
test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
         )
         == 2000
     )
+
+
+def test_latest_ancestor_before_timestamp() -> None:
+    from pyiceberg.table.metadata import TableMetadataV2
+
+    # Create metadata with 4 snapshots at ordered timestamps
+    metadata = TableMetadataV2(
+        **{
+            "format-version": 2,
+            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+            "location": "s3://bucket/test/location",
+            "last-sequence-number": 4,
+            "last-updated-ms": 1602638573590,
+            "last-column-id": 1,
+            "current-schema-id": 0,
+            "schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id": 
1, "name": "x", "required": True, "type": "long"}]}],
+            "default-spec-id": 0,
+            "partition-specs": [{"spec-id": 0, "fields": []}],
+            "last-partition-id": 999,
+            "default-sort-order-id": 0,
+            "sort-orders": [{"order-id": 0, "fields": []}],
+            "current-snapshot-id": 4,
+            "snapshots": [
+                {
+                    "snapshot-id": 1,
+                    "timestamp-ms": 1000,
+                    "sequence-number": 1,
+                    "summary": {"operation": "append"},
+                    "manifest-list": "s3://a/1.avro",
+                },
+                {
+                    "snapshot-id": 2,
+                    "parent-snapshot-id": 1,
+                    "timestamp-ms": 2000,
+                    "sequence-number": 2,
+                    "summary": {"operation": "append"},
+                    "manifest-list": "s3://a/2.avro",
+                },
+                {
+                    "snapshot-id": 3,
+                    "parent-snapshot-id": 2,
+                    "timestamp-ms": 3000,
+                    "sequence-number": 3,
+                    "summary": {"operation": "append"},
+                    "manifest-list": "s3://a/3.avro",
+                },
+                {
+                    "snapshot-id": 4,
+                    "parent-snapshot-id": 3,
+                    "timestamp-ms": 4000,
+                    "sequence-number": 4,
+                    "summary": {"operation": "append"},
+                    "manifest-list": "s3://a/4.avro",
+                },
+            ],
+        }
+    )
+
+    result = latest_ancestor_before_timestamp(metadata, 3500)
+    assert result is not None
+    assert result.snapshot_id == 3
+
+    result = latest_ancestor_before_timestamp(metadata, 2500)
+    assert result is not None
+    assert result.snapshot_id == 2
+
+    result = latest_ancestor_before_timestamp(metadata, 5000)
+    assert result is not None
+    assert result.snapshot_id == 4
+
+    result = latest_ancestor_before_timestamp(metadata, 3000)
+    assert result is not None
+    assert result.snapshot_id == 2
+
+    result = latest_ancestor_before_timestamp(metadata, 1000)
+    assert result is None

Reply via email to