This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 9b736986 feat: Add support for rolling back to timestamp (#2879)
9b736986 is described below
commit 9b736986286fab5d5e43ec0130f36cfef49febeb
Author: geruh <[email protected]>
AuthorDate: Thu Jan 22 09:23:15 2026 -0800
feat: Add support for rolling back to timestamp (#2879)
# Rationale for this change
This PR adds the ability to rollback a table to a ancestoral snapshot
given a timestamp. Some of this work was also done in #758, and is a
progress pr to be merged after #2871 & #2878. This is standalone from
the other changes but it makes use of the helpers in the other prs.
Additionally, adding some more tests.
## Are these changes tested?
Yes
## Are there any user-facing changes?
New API for meta
---------
Co-authored-by: Chinmay Bhat
<[email protected]>
---
pyiceberg/table/snapshots.py | 21 ++++++++
pyiceberg/table/update/snapshot.py | 21 ++++++++
tests/integration/test_snapshot_operations.py | 63 ++++++++++++++++++++++
tests/table/test_snapshots.py | 77 +++++++++++++++++++++++++++
4 files changed, 182 insertions(+)
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index 4ef1645d..797093f9 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -472,3 +472,24 @@ def ancestors_between(from_snapshot: Snapshot | None,
to_snapshot: Snapshot, tab
break
else:
yield from ancestors_of(to_snapshot, table_metadata)
+
+
+def latest_ancestor_before_timestamp(table_metadata: TableMetadata,
timestamp_ms: int) -> Snapshot | None:
+ """Find the latest ancestor snapshot whose timestamp is before the
provided timestamp.
+
+ Args:
+ table_metadata: The table metadata for a table
+ timestamp_ms: lookup snapshots strictly before this timestamp
+
+ Returns:
+ The latest ancestor snapshot older than the timestamp, or None if not
found.
+ """
+ result: Snapshot | None = None
+ result_timestamp: int = 0
+
+ for ancestor in ancestors_of(table_metadata.current_snapshot(),
table_metadata):
+ if timestamp_ms > ancestor.timestamp_ms > result_timestamp:
+ result = ancestor
+ result_timestamp = ancestor.timestamp_ms
+
+ return result
diff --git a/pyiceberg/table/update/snapshot.py
b/pyiceberg/table/update/snapshot.py
index 987200bf..b7c863d8 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -65,6 +65,7 @@ from pyiceberg.table.snapshots import (
SnapshotSummaryCollector,
Summary,
ancestors_of,
+ latest_ancestor_before_timestamp,
update_snapshot_summaries,
)
from pyiceberg.table.update import (
@@ -1008,6 +1009,26 @@ class
ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
return self.set_current_snapshot(snapshot_id=snapshot_id)
+ def rollback_to_timestamp(self, timestamp_ms: int) -> ManageSnapshots:
+ """Rollback the table to the latest snapshot before the given
timestamp.
+
+ Finds the latest ancestor snapshot whose timestamp is before the given
timestamp and rolls back to it.
+
+ Args:
+ timestamp_ms: Rollback to the latest snapshot before this
timestamp in milliseconds.
+
+ Returns:
+ This for method chaining
+
+ Raises:
+ ValueError: If no valid snapshot exists older than the given
timestamp.
+ """
+ snapshot =
latest_ancestor_before_timestamp(self._transaction.table_metadata, timestamp_ms)
+ if snapshot is None:
+ raise ValueError(f"Cannot roll back, no valid snapshot older than:
{timestamp_ms}")
+
+ return self.set_current_snapshot(snapshot_id=snapshot.snapshot_id)
+
def _is_current_ancestor(self, snapshot_id: int) -> bool:
return snapshot_id in self._current_ancestors()
diff --git a/tests/integration/test_snapshot_operations.py
b/tests/integration/test_snapshot_operations.py
index 8755e95f..6fd3aada 100644
--- a/tests/integration/test_snapshot_operations.py
+++ b/tests/integration/test_snapshot_operations.py
@@ -268,3 +268,66 @@ def
test_rollback_to_snapshot_unknown_id(table_with_snapshots: Table) -> None:
with pytest.raises(ValueError, match="Cannot roll back to unknown snapshot
id"):
table_with_snapshots.manage_snapshots().rollback_to_snapshot(snapshot_id=invalid_snapshot_id).commit()
+
+
[email protected]
+def test_rollback_to_timestamp_no_valid_snapshot(table_with_snapshots: Table)
-> None:
+ history = table_with_snapshots.history()
+ assert len(history) >= 1
+
+ oldest_timestamp = history[0].timestamp_ms
+
+ with pytest.raises(ValueError, match="Cannot roll back, no valid snapshot
older than"):
+
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=oldest_timestamp).commit()
+
+
[email protected]
+def test_rollback_to_timestamp(table_with_snapshots: Table) -> None:
+ current_snapshot = table_with_snapshots.current_snapshot()
+ assert current_snapshot is not None
+ assert current_snapshot.parent_snapshot_id is not None
+
+ parent_snapshot_id = current_snapshot.parent_snapshot_id
+
+
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms).commit()
+
+ updated_snapshot = table_with_snapshots.current_snapshot()
+ assert updated_snapshot is not None
+ assert updated_snapshot.snapshot_id == parent_snapshot_id
+
+
[email protected]
+def test_rollback_to_timestamp_current_snapshot(table_with_snapshots: Table)
-> None:
+ current_snapshot = table_with_snapshots.current_snapshot()
+ assert current_snapshot is not None
+
+ timestamp_after_current = current_snapshot.timestamp_ms + 100
+
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=timestamp_after_current).commit()
+
+ updated_snapshot = table_with_snapshots.current_snapshot()
+ assert updated_snapshot is not None
+ assert updated_snapshot.snapshot_id == current_snapshot.snapshot_id
+
+
[email protected]
+def test_rollback_to_timestamp_chained_with_tag(table_with_snapshots: Table)
-> None:
+ current_snapshot = table_with_snapshots.current_snapshot()
+ assert current_snapshot is not None
+ assert current_snapshot.parent_snapshot_id is not None
+
+ parent_snapshot_id = current_snapshot.parent_snapshot_id
+ tag_name = "my-tag"
+
+ (
+ table_with_snapshots.manage_snapshots()
+ .create_tag(snapshot_id=current_snapshot.snapshot_id,
tag_name=tag_name)
+ .rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms)
+ .commit()
+ )
+
+ updated_snapshot = table_with_snapshots.current_snapshot()
+ assert updated_snapshot is not None
+ assert updated_snapshot.snapshot_id == parent_snapshot_id
+ assert table_with_snapshots.metadata.refs[tag_name] == SnapshotRef(
+ snapshot_id=current_snapshot.snapshot_id, snapshot_ref_type="tag"
+ )
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index d26562ad..4aa9521b 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -30,6 +30,7 @@ from pyiceberg.table.snapshots import (
Summary,
ancestors_between,
ancestors_of,
+ latest_ancestor_before_timestamp,
update_snapshot_summaries,
)
from pyiceberg.transforms import IdentityTransform
@@ -456,3 +457,79 @@ def
test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
)
== 2000
)
+
+
+def test_latest_ancestor_before_timestamp() -> None:
+ from pyiceberg.table.metadata import TableMetadataV2
+
+ # Create metadata with 4 snapshots at ordered timestamps
+ metadata = TableMetadataV2(
+ **{
+ "format-version": 2,
+ "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+ "location": "s3://bucket/test/location",
+ "last-sequence-number": 4,
+ "last-updated-ms": 1602638573590,
+ "last-column-id": 1,
+ "current-schema-id": 0,
+ "schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id":
1, "name": "x", "required": True, "type": "long"}]}],
+ "default-spec-id": 0,
+ "partition-specs": [{"spec-id": 0, "fields": []}],
+ "last-partition-id": 999,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "current-snapshot-id": 4,
+ "snapshots": [
+ {
+ "snapshot-id": 1,
+ "timestamp-ms": 1000,
+ "sequence-number": 1,
+ "summary": {"operation": "append"},
+ "manifest-list": "s3://a/1.avro",
+ },
+ {
+ "snapshot-id": 2,
+ "parent-snapshot-id": 1,
+ "timestamp-ms": 2000,
+ "sequence-number": 2,
+ "summary": {"operation": "append"},
+ "manifest-list": "s3://a/2.avro",
+ },
+ {
+ "snapshot-id": 3,
+ "parent-snapshot-id": 2,
+ "timestamp-ms": 3000,
+ "sequence-number": 3,
+ "summary": {"operation": "append"},
+ "manifest-list": "s3://a/3.avro",
+ },
+ {
+ "snapshot-id": 4,
+ "parent-snapshot-id": 3,
+ "timestamp-ms": 4000,
+ "sequence-number": 4,
+ "summary": {"operation": "append"},
+ "manifest-list": "s3://a/4.avro",
+ },
+ ],
+ }
+ )
+
+ result = latest_ancestor_before_timestamp(metadata, 3500)
+ assert result is not None
+ assert result.snapshot_id == 3
+
+ result = latest_ancestor_before_timestamp(metadata, 2500)
+ assert result is not None
+ assert result.snapshot_id == 2
+
+ result = latest_ancestor_before_timestamp(metadata, 5000)
+ assert result is not None
+ assert result.snapshot_id == 4
+
+ result = latest_ancestor_before_timestamp(metadata, 3000)
+ assert result is not None
+ assert result.snapshot_id == 2
+
+ result = latest_ancestor_before_timestamp(metadata, 1000)
+ assert result is None