This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 3994b5d9 feat: implement set snapshot (#509)
3994b5d9 is described below

commit 3994b5d98eb17907ce591aa4258787a459c78735
Author: Feiyang Li <[email protected]>
AuthorDate: Tue Jan 20 20:40:17 2026 +0800

    feat: implement set snapshot (#509)
---
 src/iceberg/CMakeLists.txt                |   1 +
 src/iceberg/meson.build                   |   1 +
 src/iceberg/test/CMakeLists.txt           |   1 +
 src/iceberg/test/set_snapshot_test.cc     | 155 ++++++++++++++++++++++++++++++
 src/iceberg/transaction.cc                |  90 ++++++++++-------
 src/iceberg/transaction.h                 |   4 +
 src/iceberg/type_fwd.h                    |   1 +
 src/iceberg/update/meson.build            |   1 +
 src/iceberg/update/pending_update.h       |   1 +
 src/iceberg/update/set_snapshot.cc        | 141 +++++++++++++++++++++++++++
 src/iceberg/update/set_snapshot.h         |  71 ++++++++++++++
 src/iceberg/util/snapshot_util.cc         |  31 +++++-
 src/iceberg/util/snapshot_util_internal.h |  17 ++++
 13 files changed, 475 insertions(+), 40 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 9ff802a1..e3db640e 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -88,6 +88,7 @@ set(ICEBERG_SOURCES
     update/expire_snapshots.cc
     update/fast_append.cc
     update/pending_update.cc
+    update/set_snapshot.cc
     update/snapshot_update.cc
     update/update_location.cc
     update/update_partition_spec.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index febe39c9..c50aa1b2 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -106,6 +106,7 @@ iceberg_sources = files(
     'update/expire_snapshots.cc',
     'update/fast_append.cc',
     'update/pending_update.cc',
+    'update/set_snapshot.cc',
     'update/snapshot_update.cc',
     'update/update_location.cc',
     'update/update_partition_spec.cc',
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 3414a862..4c1679b7 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -173,6 +173,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    SOURCES
                    expire_snapshots_test.cc
                    fast_append_test.cc
+                   set_snapshot_test.cc
                    transaction_test.cc
                    update_location_test.cc
                    update_partition_spec_test.cc
diff --git a/src/iceberg/test/set_snapshot_test.cc 
b/src/iceberg/test/set_snapshot_test.cc
new file mode 100644
index 00000000..6bbd59b8
--- /dev/null
+++ b/src/iceberg/test/set_snapshot_test.cc
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/set_snapshot.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/transaction.h"
+
+namespace iceberg {
+
+class SetSnapshotTest : public UpdateTestBase {
+ protected:
+  // Snapshot IDs from TableMetadataV2Valid.json
+  static constexpr int64_t kOldestSnapshotId = 3051729675574597004;
+  static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;
+
+  // Timestamps from TableMetadataV2Valid.json
+  static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770;
+  static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770;
+};
+
+TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
+
+  set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+  EXPECT_EQ(snapshot_id, kOldestSnapshotId);
+
+  // Commit and verify the change was persisted
+  EXPECT_THAT(set_snapshot->Commit(), IsOk());
+  EXPECT_THAT(transaction->Commit(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
+  EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+  EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  // Try to set to a non-existent snapshot
+  int64_t invalid_snapshot_id = 9999999999999999;
+  set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);
+
+  auto result = set_snapshot->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("is not found"));
+}
+
+TEST_F(SetSnapshotTest, RollbackToValid) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  // Rollback to the oldest snapshot (which is an ancestor)
+  set_snapshot->RollbackTo(kOldestSnapshotId);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+  EXPECT_EQ(snapshot_id, kOldestSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  // Try to rollback to a non-existent snapshot
+  int64_t invalid_snapshot_id = 9999999999999999;
+  set_snapshot->RollbackTo(invalid_snapshot_id);
+
+  auto result = set_snapshot->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
+}
+
+TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  // Rollback to a time between the two snapshots
+  // This should select the oldest snapshot
+  int64_t time_between = (kOldestSnapshotTimestamp + 
kCurrentSnapshotTimestamp) / 2;
+  set_snapshot->RollbackToTime(time_between);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+  EXPECT_EQ(snapshot_id, kOldestSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  // Try to rollback to a time before any snapshot
+  int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
+  set_snapshot->RollbackToTime(time_before_all);
+
+  // Should fail - no snapshot older than the specified time
+  auto result = set_snapshot->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than"));
+}
+
+TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  // Rollback to a timestamp just after the oldest snapshot
+  int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
+  set_snapshot->RollbackToTime(time_just_after_oldest);
+
+  // Apply and verify - should return the oldest snapshot
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+  EXPECT_EQ(snapshot_id, kOldestSnapshotId);
+}
+
+TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
+  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
+  EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
+
+  EXPECT_THAT(set_snapshot->Commit(), IsOk());
+  EXPECT_THAT(transaction->Commit(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
+  EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index d10586a4..0f950b02 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -23,6 +23,8 @@
 #include <optional>
 
 #include "iceberg/catalog.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
 #include "iceberg/table.h"
 #include "iceberg/table_metadata.h"
 #include "iceberg/table_properties.h"
@@ -32,6 +34,7 @@
 #include "iceberg/update/expire_snapshots.h"
 #include "iceberg/update/fast_append.h"
 #include "iceberg/update/pending_update.h"
+#include "iceberg/update/set_snapshot.h"
 #include "iceberg/update/snapshot_update.h"
 #include "iceberg/update/update_location.h"
 #include "iceberg/update/update_partition_spec.h"
@@ -96,23 +99,35 @@ Status Transaction::AddUpdate(const 
std::shared_ptr<PendingUpdate>& update) {
 
 Status Transaction::Apply(PendingUpdate& update) {
   switch (update.kind()) {
-    case PendingUpdate::Kind::kUpdateProperties: {
-      auto& update_properties = 
internal::checked_cast<UpdateProperties&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
-      if (!result.updates.empty()) {
-        metadata_builder_->SetProperties(std::move(result.updates));
+    case PendingUpdate::Kind::kExpireSnapshots: {
+      auto& expire_snapshots = 
internal::checked_cast<ExpireSnapshots&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
+      if (!result.snapshot_ids_to_remove.empty()) {
+        
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
       }
-      if (!result.removals.empty()) {
-        metadata_builder_->RemoveProperties(std::move(result.removals));
+      if (!result.refs_to_remove.empty()) {
+        for (const auto& ref_name : result.refs_to_remove) {
+          metadata_builder_->RemoveRef(ref_name);
+        }
       }
-      if (result.format_version.has_value()) {
-        metadata_builder_->UpgradeFormatVersion(result.format_version.value());
+      if (!result.partition_spec_ids_to_remove.empty()) {
+        metadata_builder_->RemovePartitionSpecs(
+            std::move(result.partition_spec_ids_to_remove));
+      }
+      if (!result.schema_ids_to_remove.empty()) {
+        
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
       }
     } break;
-    case PendingUpdate::Kind::kUpdateSortOrder: {
-      auto& update_sort_order = 
internal::checked_cast<UpdateSortOrder&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
-      metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+    case PendingUpdate::Kind::kSetSnapshot: {
+      auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply());
+      metadata_builder_->SetBranchSnapshot(snapshot_id,
+                                           
std::string(SnapshotRef::kMainBranch));
+    } break;
+    case PendingUpdate::Kind::kUpdateLocation: {
+      auto& update_location = internal::checked_cast<UpdateLocation&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
+      metadata_builder_->SetLocation(location);
     } break;
     case PendingUpdate::Kind::kUpdatePartitionSpec: {
       auto& update_partition_spec = 
internal::checked_cast<UpdatePartitionSpec&>(update);
@@ -123,12 +138,30 @@ Status Transaction::Apply(PendingUpdate& update) {
         metadata_builder_->AddPartitionSpec(std::move(result.spec));
       }
     } break;
+    case PendingUpdate::Kind::kUpdateProperties: {
+      auto& update_properties = 
internal::checked_cast<UpdateProperties&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
+      if (!result.updates.empty()) {
+        metadata_builder_->SetProperties(std::move(result.updates));
+      }
+      if (!result.removals.empty()) {
+        metadata_builder_->RemoveProperties(std::move(result.removals));
+      }
+      if (result.format_version.has_value()) {
+        metadata_builder_->UpgradeFormatVersion(result.format_version.value());
+      }
+    } break;
     case PendingUpdate::Kind::kUpdateSchema: {
       auto& update_schema = internal::checked_cast<UpdateSchema&>(update);
       ICEBERG_ASSIGN_OR_RAISE(auto result, update_schema.Apply());
       metadata_builder_->SetCurrentSchema(std::move(result.schema),
                                           result.new_last_column_id);
     } break;
+    case PendingUpdate::Kind::kUpdateSortOrder: {
+      auto& update_sort_order = 
internal::checked_cast<UpdateSortOrder&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
+      metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
+    } break;
     case PendingUpdate::Kind::kUpdateSnapshot: {
       const auto& base = metadata_builder_->current();
 
@@ -165,30 +198,6 @@ Status Transaction::Apply(PendingUpdate& update) {
         metadata_builder_->AssignUUID();
       }
     } break;
-    case PendingUpdate::Kind::kExpireSnapshots: {
-      auto& expire_snapshots = 
internal::checked_cast<ExpireSnapshots&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
-      if (!result.snapshot_ids_to_remove.empty()) {
-        
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
-      }
-      if (!result.refs_to_remove.empty()) {
-        for (const auto& ref_name : result.refs_to_remove) {
-          metadata_builder_->RemoveRef(ref_name);
-        }
-      }
-      if (!result.partition_spec_ids_to_remove.empty()) {
-        metadata_builder_->RemovePartitionSpecs(
-            std::move(result.partition_spec_ids_to_remove));
-      }
-      if (!result.schema_ids_to_remove.empty()) {
-        
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
-      }
-    } break;
-    case PendingUpdate::Kind::kUpdateLocation: {
-      auto& update_location = internal::checked_cast<UpdateLocation&>(update);
-      ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
-      metadata_builder_->SetLocation(location);
-    } break;
     default:
       return NotSupported("Unsupported pending update: {}",
                           static_cast<int32_t>(update.kind()));
@@ -293,6 +302,13 @@ Result<std::shared_ptr<UpdateLocation>> 
Transaction::NewUpdateLocation() {
   return update_location;
 }
 
+Result<std::shared_ptr<SetSnapshot>> Transaction::NewSetSnapshot() {
+  ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SetSnapshot> set_snapshot,
+                          SetSnapshot::Make(shared_from_this()));
+  ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot));
+  return set_snapshot;
+}
+
 Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<FastAppend> fast_append,
                           FastAppend::Make(table_->name().name, 
shared_from_this()));
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 0f567312..23b01fb1 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -86,6 +86,10 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// changes.
   Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
 
+  /// \brief Create a new SetSnapshot to set the current snapshot or rollback 
to a
+  /// previous snapshot and commit the changes.
+  Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
+
   /// \brief Create a new FastAppend to append data files and commit the 
changes.
   Result<std::shared_ptr<FastAppend>> NewFastAppend();
 
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 5bf03a00..40736419 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -192,6 +192,7 @@ class Transaction;
 class ExpireSnapshots;
 class FastAppend;
 class PendingUpdate;
+class SetSnapshot;
 class SnapshotUpdate;
 class UpdateLocation;
 class UpdatePartitionSpec;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index 4ca40684..2bbd1037 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -20,6 +20,7 @@ install_headers(
         'expire_snapshots.h',
         'fast_append.h',
         'pending_update.h',
+        'set_snapshot.h',
         'snapshot_update.h',
         'update_location.h',
         'update_partition_spec.h',
diff --git a/src/iceberg/update/pending_update.h 
b/src/iceberg/update/pending_update.h
index 441d086a..68c2d205 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -43,6 +43,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
  public:
   enum class Kind : uint8_t {
     kExpireSnapshots,
+    kSetSnapshot,
     kUpdateLocation,
     kUpdatePartitionSpec,
     kUpdateProperties,
diff --git a/src/iceberg/update/set_snapshot.cc 
b/src/iceberg/update/set_snapshot.cc
new file mode 100644
index 00000000..7258bc4d
--- /dev/null
+++ b/src/iceberg/update/set_snapshot.cc
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/set_snapshot.h"
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/error_collector.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<SetSnapshot>> SetSnapshot::Make(
+    std::shared_ptr<Transaction> transaction) {
+  ICEBERG_PRECHECK(transaction != nullptr,
+                   "Cannot create SetSnapshot without a transaction");
+  return std::shared_ptr<SetSnapshot>(new SetSnapshot(std::move(transaction)));
+}
+
+SetSnapshot::SetSnapshot(std::shared_ptr<Transaction> transaction)
+    : PendingUpdate(std::move(transaction)) {}
+
+SetSnapshot::~SetSnapshot() = default;
+
+SetSnapshot& SetSnapshot::SetCurrentSnapshot(int64_t snapshot_id) {
+  // Validate that the snapshot exists
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot, 
base().SnapshotById(snapshot_id));
+  ICEBERG_BUILDER_CHECK(snapshot != nullptr,
+                        "Cannot roll back to unknown snapshot id: {}", 
snapshot_id);
+  target_snapshot_id_ = snapshot_id;
+  return *this;
+}
+
+SetSnapshot& SetSnapshot::RollbackToTime(int64_t timestamp_ms) {
+  // Find the latest snapshot by timestamp older than timestamp_ms
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot_id_opt,
+                                   FindLatestAncestorOlderThan(timestamp_ms));
+
+  ICEBERG_BUILDER_CHECK(snapshot_id_opt.has_value(),
+                        "Cannot roll back, no valid snapshot older than: {}",
+                        timestamp_ms);
+
+  target_snapshot_id_ = snapshot_id_opt.value();
+  is_rollback_ = true;
+
+  return *this;
+}
+
+SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) {
+  // Validate that the snapshot exists
+  auto snapshot_result = base().SnapshotById(snapshot_id);
+  ICEBERG_BUILDER_CHECK(snapshot_result.has_value(),
+                        "Cannot roll back to unknown snapshot id: {}", 
snapshot_id);
+
+  // Validate that the snapshot is an ancestor of the current state
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(bool is_ancestor,
+                                   SnapshotUtil::IsAncestorOf(base(), 
snapshot_id));
+  ICEBERG_BUILDER_CHECK(
+      is_ancestor,
+      "Cannot roll back to snapshot, not an ancestor of the current state: {}",
+      snapshot_id);
+
+  return SetCurrentSnapshot(snapshot_id);
+}
+
+Result<int64_t> SetSnapshot::Apply() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  const TableMetadata& base_metadata = transaction_->current();
+
+  // If no target snapshot was configured, return current state (NOOP)
+  if (!target_snapshot_id_.has_value()) {
+    ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, base_metadata.Snapshot());
+    return current_snapshot->snapshot_id;
+  }
+
+  // Validate that the snapshot exists
+  auto snapshot_result = 
base_metadata.SnapshotById(target_snapshot_id_.value());
+  ICEBERG_CHECK(snapshot_result.has_value(),
+                "Cannot roll back to unknown snapshot id: {}",
+                target_snapshot_id_.value());
+
+  // If this is a rollback, validate that the target is still an ancestor
+  if (is_rollback_) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        bool is_ancestor,
+        SnapshotUtil::IsAncestorOf(base_metadata, 
target_snapshot_id_.value()));
+    ICEBERG_CHECK(is_ancestor,
+                  "Cannot roll back to {}: not an ancestor of the current 
table state",
+                  target_snapshot_id_.value());
+  }
+
+  return target_snapshot_id_.value();
+}
+
+Result<std::optional<int64_t>> SetSnapshot::FindLatestAncestorOlderThan(
+    int64_t timestamp_ms) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, 
SnapshotUtil::CurrentAncestors(base()));
+
+  TimePointMs target_timestamp = TimePointMsFromUnixMs(timestamp_ms);
+  TimePointMs latest_timestamp = TimePointMsFromUnixMs(0);
+  std::optional<int64_t> result = std::nullopt;
+
+  for (const auto& snapshot : ancestors) {
+    if (snapshot == nullptr) {
+      continue;
+    }
+    auto current_timestamp = snapshot->timestamp_ms;
+    if (current_timestamp < target_timestamp && current_timestamp > 
latest_timestamp) {
+      latest_timestamp = current_timestamp;
+      result = snapshot->snapshot_id;
+    }
+  }
+
+  return result;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/set_snapshot.h 
b/src/iceberg/update/set_snapshot.h
new file mode 100644
index 00000000..1ad39960
--- /dev/null
+++ b/src/iceberg/update/set_snapshot.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+/// \file iceberg/update/set_snapshot.h
+/// \brief Sets the current snapshot directly or by rolling back.
+
+namespace iceberg {
+
+/// \brief Sets the current snapshot directly or by rolling back.
+class ICEBERG_EXPORT SetSnapshot : public PendingUpdate {
+ public:
+  static Result<std::shared_ptr<SetSnapshot>> Make(
+      std::shared_ptr<Transaction> transaction);
+
+  ~SetSnapshot() override;
+
+  /// \brief Sets the table's current state to a specific Snapshot identified 
by id.
+  SetSnapshot& SetCurrentSnapshot(int64_t snapshot_id);
+
+  /// \brief Rolls back the table's state to the last Snapshot before the 
given timestamp.
+  SetSnapshot& RollbackToTime(int64_t timestamp_ms);
+
+  /// \brief Rollback table's state to a specific Snapshot identified by id.
+  SetSnapshot& RollbackTo(int64_t snapshot_id);
+
+  Kind kind() const final { return Kind::kSetSnapshot; }
+
+  /// \brief Apply the pending changes and return the target snapshot ID.
+  Result<int64_t> Apply();
+
+ private:
+  explicit SetSnapshot(std::shared_ptr<Transaction> transaction);
+
+  /// \brief Find the latest snapshot whose timestamp is before the provided 
timestamp.
+  ///
+  /// \param timestamp_ms Lookup snapshots before this timestamp
+  /// \return The snapshot ID that was current at the given timestamp, or 
nullopt
+  Result<std::optional<int64_t>> FindLatestAncestorOlderThan(int64_t 
timestamp_ms) const;
+
+  std::optional<int64_t> target_snapshot_id_;
+  bool is_rollback_{false};
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util.cc 
b/src/iceberg/util/snapshot_util.cc
index c3b93be8..d3d5669c 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -69,6 +69,22 @@ Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
   return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
 }
 
+Result<bool> SnapshotUtil::IsAncestorOf(const TableMetadata& metadata,
+                                        int64_t ancestor_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot());
+  ICEBERG_CHECK(current != nullptr, "Current snapshot is null");
+
+  // Create a lookup function that uses the metadata
+  auto lookup = [&metadata](int64_t id) -> Result<std::shared_ptr<Snapshot>> {
+    return metadata.SnapshotById(id);
+  };
+
+  ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(current->snapshot_id, 
lookup));
+  return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& 
snapshot) {
+    return snapshot != nullptr && snapshot->snapshot_id == 
ancestor_snapshot_id;
+  });
+}
+
 Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t 
snapshot_id,
                                               int64_t 
ancestor_parent_snapshot_id) {
   ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
@@ -81,9 +97,18 @@ Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& 
table, int64_t snapsh
 
 Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
     const Table& table) {
-  auto current_result = table.current_snapshot();
-  ICEBERG_ACTION_FOR_NOT_FOUND(current_result, return {});
-  return AncestorsOf(table, current_result.value());
+  ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+  return AncestorsOf(table, current);
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+    const TableMetadata& metadata) {
+  ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot());
+  auto lookup = [&metadata](int64_t id) -> Result<std::shared_ptr<Snapshot>> {
+    return metadata.SnapshotById(id);
+  };
+
+  return AncestorsOf(current, lookup);
 }
 
 Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& 
table) {
diff --git a/src/iceberg/util/snapshot_util_internal.h 
b/src/iceberg/util/snapshot_util_internal.h
index ca106cb3..0a000c69 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -70,6 +70,15 @@ class ICEBERG_EXPORT SnapshotUtil {
   /// \return true if ancestor_snapshot_id is an ancestor of the current 
snapshot
   static Result<bool> IsAncestorOf(const Table& table, int64_t 
ancestor_snapshot_id);
 
+  /// \brief Returns whether ancestor_snapshot_id is an ancestor of the 
metadata's current
+  /// state.
+  ///
+  /// \param metadata The table metadata to check
+  /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+  /// \return true if ancestor_snapshot_id is an ancestor of the current 
snapshot
+  static Result<bool> IsAncestorOf(const TableMetadata& metadata,
+                                   int64_t ancestor_snapshot_id);
+
   /// \brief Returns whether some ancestor of snapshot_id has parentId matches
   /// ancestor_parent_snapshot_id.
   ///
@@ -88,6 +97,14 @@ class ICEBERG_EXPORT SnapshotUtil {
   static Result<std::vector<std::shared_ptr<Snapshot>>> CurrentAncestors(
       const Table& table);
 
+  /// \brief Returns a vector that traverses the metadata's snapshots from the 
current to
+  /// the last known ancestor.
+  ///
+  /// \param metadata The table metadata
+  /// \return A vector from the metadata's current snapshot to its last known 
ancestor
+  static Result<std::vector<std::shared_ptr<Snapshot>>> CurrentAncestors(
+      const TableMetadata& metadata);
+
   /// \brief Returns the snapshot IDs for the ancestors of the current table 
state.
   ///
   /// Ancestor IDs are ordered by commit time, descending. The first ID is the 
current

Reply via email to