This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 3994b5d9 feat: implement set snapshot (#509)
3994b5d9 is described below
commit 3994b5d98eb17907ce591aa4258787a459c78735
Author: Feiyang Li <[email protected]>
AuthorDate: Tue Jan 20 20:40:17 2026 +0800
feat: implement set snapshot (#509)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/meson.build | 1 +
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/set_snapshot_test.cc | 155 ++++++++++++++++++++++++++++++
src/iceberg/transaction.cc | 90 ++++++++++-------
src/iceberg/transaction.h | 4 +
src/iceberg/type_fwd.h | 1 +
src/iceberg/update/meson.build | 1 +
src/iceberg/update/pending_update.h | 1 +
src/iceberg/update/set_snapshot.cc | 141 +++++++++++++++++++++++++++
src/iceberg/update/set_snapshot.h | 71 ++++++++++++++
src/iceberg/util/snapshot_util.cc | 31 +++++-
src/iceberg/util/snapshot_util_internal.h | 17 ++++
13 files changed, 475 insertions(+), 40 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 9ff802a1..e3db640e 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -88,6 +88,7 @@ set(ICEBERG_SOURCES
update/expire_snapshots.cc
update/fast_append.cc
update/pending_update.cc
+ update/set_snapshot.cc
update/snapshot_update.cc
update/update_location.cc
update/update_partition_spec.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index febe39c9..c50aa1b2 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -106,6 +106,7 @@ iceberg_sources = files(
'update/expire_snapshots.cc',
'update/fast_append.cc',
'update/pending_update.cc',
+ 'update/set_snapshot.cc',
'update/snapshot_update.cc',
'update/update_location.cc',
'update/update_partition_spec.cc',
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 3414a862..4c1679b7 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -173,6 +173,7 @@ if(ICEBERG_BUILD_BUNDLE)
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
+ set_snapshot_test.cc
transaction_test.cc
update_location_test.cc
update_partition_spec_test.cc
diff --git a/src/iceberg/test/set_snapshot_test.cc
b/src/iceberg/test/set_snapshot_test.cc
new file mode 100644
index 00000000..6bbd59b8
--- /dev/null
+++ b/src/iceberg/test/set_snapshot_test.cc
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/set_snapshot.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/transaction.h"
+
+namespace iceberg {
+
+class SetSnapshotTest : public UpdateTestBase {
+ protected:
+ // Snapshot IDs from TableMetadataV2Valid.json
+ static constexpr int64_t kOldestSnapshotId = 3051729675574597004;
+ static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;
+
+ // Timestamps from TableMetadataV2Valid.json
+ static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770;
+ static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770;
+};
+
+TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
+
+ set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+ EXPECT_EQ(snapshot_id, kOldestSnapshotId);
+
+ // Commit and verify the change was persisted
+ EXPECT_THAT(set_snapshot->Commit(), IsOk());
+ EXPECT_THAT(transaction->Commit(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
+ EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+ EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ // Try to set to a non-existent snapshot
+ int64_t invalid_snapshot_id = 9999999999999999;
+ set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);
+
+ auto result = set_snapshot->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(result, HasErrorMessage("is not found"));
+}
+
+TEST_F(SetSnapshotTest, RollbackToValid) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ // Rollback to the oldest snapshot (which is an ancestor)
+ set_snapshot->RollbackTo(kOldestSnapshotId);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+ EXPECT_EQ(snapshot_id, kOldestSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ // Try to rollback to a non-existent snapshot
+ int64_t invalid_snapshot_id = 9999999999999999;
+ set_snapshot->RollbackTo(invalid_snapshot_id);
+
+ auto result = set_snapshot->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
+}
+
+TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ // Rollback to a time between the two snapshots
+ // This should select the oldest snapshot
+ int64_t time_between = (kOldestSnapshotTimestamp +
kCurrentSnapshotTimestamp) / 2;
+ set_snapshot->RollbackToTime(time_between);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+ EXPECT_EQ(snapshot_id, kOldestSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ // Try to rollback to a time before any snapshot
+ int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
+ set_snapshot->RollbackToTime(time_before_all);
+
+ // Should fail - no snapshot older than the specified time
+ auto result = set_snapshot->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than"));
+}
+
+TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ // Rollback to a timestamp just after the oldest snapshot
+ int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
+ set_snapshot->RollbackToTime(time_just_after_oldest);
+
+ // Apply and verify - should return the oldest snapshot
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+ EXPECT_EQ(snapshot_id, kOldestSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
+ ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+ ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+ EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
+
+ EXPECT_THAT(set_snapshot->Commit(), IsOk());
+ EXPECT_THAT(transaction->Commit(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+ ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
+ EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index d10586a4..0f950b02 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -23,6 +23,8 @@
#include <optional>
#include "iceberg/catalog.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
@@ -32,6 +34,7 @@
#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/fast_append.h"
#include "iceberg/update/pending_update.h"
+#include "iceberg/update/set_snapshot.h"
#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_location.h"
#include "iceberg/update/update_partition_spec.h"
@@ -96,23 +99,35 @@ Status Transaction::AddUpdate(const
std::shared_ptr<PendingUpdate>& update) {
Status Transaction::Apply(PendingUpdate& update) {
switch (update.kind()) {
- case PendingUpdate::Kind::kUpdateProperties: {
- auto& update_properties =
internal::checked_cast<UpdateProperties&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
- if (!result.updates.empty()) {
- metadata_builder_->SetProperties(std::move(result.updates));
+ case PendingUpdate::Kind::kExpireSnapshots: {
+ auto& expire_snapshots =
internal::checked_cast<ExpireSnapshots&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
+ if (!result.snapshot_ids_to_remove.empty()) {
+
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
}
- if (!result.removals.empty()) {
- metadata_builder_->RemoveProperties(std::move(result.removals));
+ if (!result.refs_to_remove.empty()) {
+ for (const auto& ref_name : result.refs_to_remove) {
+ metadata_builder_->RemoveRef(ref_name);
+ }
}
- if (result.format_version.has_value()) {
- metadata_builder_->UpgradeFormatVersion(result.format_version.value());
+ if (!result.partition_spec_ids_to_remove.empty()) {
+ metadata_builder_->RemovePartitionSpecs(
+ std::move(result.partition_spec_ids_to_remove));
+ }
+ if (!result.schema_ids_to_remove.empty()) {
+
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
}
} break;
- case PendingUpdate::Kind::kUpdateSortOrder: {
- auto& update_sort_order =
internal::checked_cast<UpdateSortOrder&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
- metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+ case PendingUpdate::Kind::kSetSnapshot: {
+ auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply());
+ metadata_builder_->SetBranchSnapshot(snapshot_id,
+
std::string(SnapshotRef::kMainBranch));
+ } break;
+ case PendingUpdate::Kind::kUpdateLocation: {
+ auto& update_location = internal::checked_cast<UpdateLocation&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
+ metadata_builder_->SetLocation(location);
} break;
case PendingUpdate::Kind::kUpdatePartitionSpec: {
auto& update_partition_spec =
internal::checked_cast<UpdatePartitionSpec&>(update);
@@ -123,12 +138,30 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AddPartitionSpec(std::move(result.spec));
}
} break;
+ case PendingUpdate::Kind::kUpdateProperties: {
+ auto& update_properties =
internal::checked_cast<UpdateProperties&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
+ if (!result.updates.empty()) {
+ metadata_builder_->SetProperties(std::move(result.updates));
+ }
+ if (!result.removals.empty()) {
+ metadata_builder_->RemoveProperties(std::move(result.removals));
+ }
+ if (result.format_version.has_value()) {
+ metadata_builder_->UpgradeFormatVersion(result.format_version.value());
+ }
+ } break;
case PendingUpdate::Kind::kUpdateSchema: {
auto& update_schema = internal::checked_cast<UpdateSchema&>(update);
ICEBERG_ASSIGN_OR_RAISE(auto result, update_schema.Apply());
metadata_builder_->SetCurrentSchema(std::move(result.schema),
result.new_last_column_id);
} break;
+ case PendingUpdate::Kind::kUpdateSortOrder: {
+ auto& update_sort_order =
internal::checked_cast<UpdateSortOrder&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
+ metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+ } break;
case PendingUpdate::Kind::kUpdateSnapshot: {
const auto& base = metadata_builder_->current();
@@ -165,30 +198,6 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AssignUUID();
}
} break;
- case PendingUpdate::Kind::kExpireSnapshots: {
- auto& expire_snapshots =
internal::checked_cast<ExpireSnapshots&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
- if (!result.snapshot_ids_to_remove.empty()) {
-
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
- }
- if (!result.refs_to_remove.empty()) {
- for (const auto& ref_name : result.refs_to_remove) {
- metadata_builder_->RemoveRef(ref_name);
- }
- }
- if (!result.partition_spec_ids_to_remove.empty()) {
- metadata_builder_->RemovePartitionSpecs(
- std::move(result.partition_spec_ids_to_remove));
- }
- if (!result.schema_ids_to_remove.empty()) {
-
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
- }
- } break;
- case PendingUpdate::Kind::kUpdateLocation: {
- auto& update_location = internal::checked_cast<UpdateLocation&>(update);
- ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
- metadata_builder_->SetLocation(location);
- } break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
@@ -293,6 +302,13 @@ Result<std::shared_ptr<UpdateLocation>>
Transaction::NewUpdateLocation() {
return update_location;
}
+Result<std::shared_ptr<SetSnapshot>> Transaction::NewSetSnapshot() {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SetSnapshot> set_snapshot,
+ SetSnapshot::Make(shared_from_this()));
+ ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot));
+ return set_snapshot;
+}
+
Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<FastAppend> fast_append,
FastAppend::Make(table_->name().name,
shared_from_this()));
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 0f567312..23b01fb1 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -86,6 +86,10 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
+ /// \brief Create a new SetSnapshot to set the current snapshot or rollback
to a
+ /// previous snapshot and commit the changes.
+ Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
+
/// \brief Create a new FastAppend to append data files and commit the
changes.
Result<std::shared_ptr<FastAppend>> NewFastAppend();
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 5bf03a00..40736419 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -192,6 +192,7 @@ class Transaction;
class ExpireSnapshots;
class FastAppend;
class PendingUpdate;
+class SetSnapshot;
class SnapshotUpdate;
class UpdateLocation;
class UpdatePartitionSpec;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index 4ca40684..2bbd1037 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -20,6 +20,7 @@ install_headers(
'expire_snapshots.h',
'fast_append.h',
'pending_update.h',
+ 'set_snapshot.h',
'snapshot_update.h',
'update_location.h',
'update_partition_spec.h',
diff --git a/src/iceberg/update/pending_update.h
b/src/iceberg/update/pending_update.h
index 441d086a..68c2d205 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -43,6 +43,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
public:
enum class Kind : uint8_t {
kExpireSnapshots,
+ kSetSnapshot,
kUpdateLocation,
kUpdatePartitionSpec,
kUpdateProperties,
diff --git a/src/iceberg/update/set_snapshot.cc
b/src/iceberg/update/set_snapshot.cc
new file mode 100644
index 00000000..7258bc4d
--- /dev/null
+++ b/src/iceberg/update/set_snapshot.cc
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/set_snapshot.h"
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/error_collector.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<SetSnapshot>> SetSnapshot::Make(
+ std::shared_ptr<Transaction> transaction) {
+ ICEBERG_PRECHECK(transaction != nullptr,
+ "Cannot create SetSnapshot without a transaction");
+ return std::shared_ptr<SetSnapshot>(new SetSnapshot(std::move(transaction)));
+}
+
+SetSnapshot::SetSnapshot(std::shared_ptr<Transaction> transaction)
+ : PendingUpdate(std::move(transaction)) {}
+
+SetSnapshot::~SetSnapshot() = default;
+
+SetSnapshot& SetSnapshot::SetCurrentSnapshot(int64_t snapshot_id) {
+ // Validate that the snapshot exists
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot,
base().SnapshotById(snapshot_id));
+ ICEBERG_BUILDER_CHECK(snapshot != nullptr,
+ "Cannot roll back to unknown snapshot id: {}",
snapshot_id);
+ target_snapshot_id_ = snapshot_id;
+ return *this;
+}
+
+SetSnapshot& SetSnapshot::RollbackToTime(int64_t timestamp_ms) {
+ // Find the latest snapshot by timestamp older than timestamp_ms
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot_id_opt,
+ FindLatestAncestorOlderThan(timestamp_ms));
+
+ ICEBERG_BUILDER_CHECK(snapshot_id_opt.has_value(),
+ "Cannot roll back, no valid snapshot older than: {}",
+ timestamp_ms);
+
+ target_snapshot_id_ = snapshot_id_opt.value();
+ is_rollback_ = true;
+
+ return *this;
+}
+
+SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) {
+ // Validate that the snapshot exists
+ auto snapshot_result = base().SnapshotById(snapshot_id);
+ ICEBERG_BUILDER_CHECK(snapshot_result.has_value(),
+ "Cannot roll back to unknown snapshot id: {}",
snapshot_id);
+
+ // Validate that the snapshot is an ancestor of the current state
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(bool is_ancestor,
+ SnapshotUtil::IsAncestorOf(base(),
snapshot_id));
+ ICEBERG_BUILDER_CHECK(
+ is_ancestor,
+ "Cannot roll back to snapshot, not an ancestor of the current state: {}",
+ snapshot_id);
+
+ return SetCurrentSnapshot(snapshot_id);
+}
+
+Result<int64_t> SetSnapshot::Apply() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+ const TableMetadata& base_metadata = transaction_->current();
+
+ // If no target snapshot was configured, return current state (NOOP)
+ if (!target_snapshot_id_.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, base_metadata.Snapshot());
+ return current_snapshot->snapshot_id;
+ }
+
+ // Validate that the snapshot exists
+ auto snapshot_result =
base_metadata.SnapshotById(target_snapshot_id_.value());
+ ICEBERG_CHECK(snapshot_result.has_value(),
+ "Cannot roll back to unknown snapshot id: {}",
+ target_snapshot_id_.value());
+
+ // If this is a rollback, validate that the target is still an ancestor
+ if (is_rollback_) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ bool is_ancestor,
+ SnapshotUtil::IsAncestorOf(base_metadata,
target_snapshot_id_.value()));
+ ICEBERG_CHECK(is_ancestor,
+ "Cannot roll back to {}: not an ancestor of the current
table state",
+ target_snapshot_id_.value());
+ }
+
+ return target_snapshot_id_.value();
+}
+
+Result<std::optional<int64_t>> SetSnapshot::FindLatestAncestorOlderThan(
+ int64_t timestamp_ms) const {
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors,
SnapshotUtil::CurrentAncestors(base()));
+
+ TimePointMs target_timestamp = TimePointMsFromUnixMs(timestamp_ms);
+ TimePointMs latest_timestamp = TimePointMsFromUnixMs(0);
+ std::optional<int64_t> result = std::nullopt;
+
+ for (const auto& snapshot : ancestors) {
+ if (snapshot == nullptr) {
+ continue;
+ }
+ auto current_timestamp = snapshot->timestamp_ms;
+ if (current_timestamp < target_timestamp && current_timestamp >
latest_timestamp) {
+ latest_timestamp = current_timestamp;
+ result = snapshot->snapshot_id;
+ }
+ }
+
+ return result;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/set_snapshot.h
b/src/iceberg/update/set_snapshot.h
new file mode 100644
index 00000000..1ad39960
--- /dev/null
+++ b/src/iceberg/update/set_snapshot.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+/// \file iceberg/update/set_snapshot.h
+/// \brief Sets the current snapshot directly or by rolling back.
+
+namespace iceberg {
+
+/// \brief Sets the current snapshot directly or by rolling back.
+class ICEBERG_EXPORT SetSnapshot : public PendingUpdate {
+ public:
+ static Result<std::shared_ptr<SetSnapshot>> Make(
+ std::shared_ptr<Transaction> transaction);
+
+ ~SetSnapshot() override;
+
+ /// \brief Sets the table's current state to a specific Snapshot identified
by id.
+ SetSnapshot& SetCurrentSnapshot(int64_t snapshot_id);
+
+ /// \brief Rolls back the table's state to the last Snapshot before the
given timestamp.
+ SetSnapshot& RollbackToTime(int64_t timestamp_ms);
+
+ /// \brief Rollback table's state to a specific Snapshot identified by id.
+ SetSnapshot& RollbackTo(int64_t snapshot_id);
+
+ Kind kind() const final { return Kind::kSetSnapshot; }
+
+ /// \brief Apply the pending changes and return the target snapshot ID.
+ Result<int64_t> Apply();
+
+ private:
+ explicit SetSnapshot(std::shared_ptr<Transaction> transaction);
+
+ /// \brief Find the latest snapshot whose timestamp is before the provided
timestamp.
+ ///
+ /// \param timestamp_ms Lookup snapshots before this timestamp
+ /// \return The snapshot ID that was current at the given timestamp, or
nullopt
+ Result<std::optional<int64_t>> FindLatestAncestorOlderThan(int64_t
timestamp_ms) const;
+
+ std::optional<int64_t> target_snapshot_id_;
+ bool is_rollback_{false};
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util.cc
b/src/iceberg/util/snapshot_util.cc
index c3b93be8..d3d5669c 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -69,6 +69,22 @@ Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
}
+Result<bool> SnapshotUtil::IsAncestorOf(const TableMetadata& metadata,
+ int64_t ancestor_snapshot_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot());
+ ICEBERG_CHECK(current != nullptr, "Current snapshot is null");
+
+ // Create a lookup function that uses the metadata
+ auto lookup = [&metadata](int64_t id) -> Result<std::shared_ptr<Snapshot>> {
+ return metadata.SnapshotById(id);
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(current->snapshot_id,
lookup));
+ return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto&
snapshot) {
+ return snapshot != nullptr && snapshot->snapshot_id ==
ancestor_snapshot_id;
+ });
+}
+
Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t
snapshot_id,
int64_t
ancestor_parent_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
@@ -81,9 +97,18 @@ Result<bool> SnapshotUtil::IsParentAncestorOf(const Table&
table, int64_t snapsh
Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
const Table& table) {
- auto current_result = table.current_snapshot();
- ICEBERG_ACTION_FOR_NOT_FOUND(current_result, return {});
- return AncestorsOf(table, current_result.value());
+ ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+ return AncestorsOf(table, current);
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+ const TableMetadata& metadata) {
+ ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot());
+ auto lookup = [&metadata](int64_t id) -> Result<std::shared_ptr<Snapshot>> {
+ return metadata.SnapshotById(id);
+ };
+
+ return AncestorsOf(current, lookup);
}
Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table&
table) {
diff --git a/src/iceberg/util/snapshot_util_internal.h
b/src/iceberg/util/snapshot_util_internal.h
index ca106cb3..0a000c69 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -70,6 +70,15 @@ class ICEBERG_EXPORT SnapshotUtil {
/// \return true if ancestor_snapshot_id is an ancestor of the current
snapshot
static Result<bool> IsAncestorOf(const Table& table, int64_t
ancestor_snapshot_id);
+ /// \brief Returns whether ancestor_snapshot_id is an ancestor of the
metadata's current
+ /// state.
+ ///
+ /// \param metadata The table metadata to check
+ /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+ /// \return true if ancestor_snapshot_id is an ancestor of the current
snapshot
+ static Result<bool> IsAncestorOf(const TableMetadata& metadata,
+ int64_t ancestor_snapshot_id);
+
/// \brief Returns whether some ancestor of snapshot_id has parentId matches
/// ancestor_parent_snapshot_id.
///
@@ -88,6 +97,14 @@ class ICEBERG_EXPORT SnapshotUtil {
static Result<std::vector<std::shared_ptr<Snapshot>>> CurrentAncestors(
const Table& table);
+ /// \brief Returns a vector that traverses the metadata's snapshots from the
current to
+ /// the last known ancestor.
+ ///
+ /// \param metadata The table metadata
+ /// \return A vector from the metadata's current snapshot to its last known
ancestor
+ static Result<std::vector<std::shared_ptr<Snapshot>>> CurrentAncestors(
+ const TableMetadata& metadata);
+
/// \brief Returns the snapshot IDs for the ancestors of the current table
state.
///
/// Ancestor IDs are ordered by commit time, descending. The first ID is the
current