This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e6a7e3a feat: add SnapshotManager (#542)
7e6a7e3a is described below

commit 7e6a7e3a1ad4cb26e16b5883b36891f006ef1c80
Author: Junwang Zhao <[email protected]>
AuthorDate: Tue Feb 24 16:49:22 2026 +0800

    feat: add SnapshotManager (#542)
---
 src/iceberg/CMakeLists.txt                |   1 +
 src/iceberg/meson.build                   |   1 +
 src/iceberg/table.cc                      |   5 +
 src/iceberg/table.h                       |   3 +
 src/iceberg/test/CMakeLists.txt           |   2 +-
 src/iceberg/test/fast_append_test.cc      |   8 +-
 src/iceberg/test/set_snapshot_test.cc     | 155 -----------
 src/iceberg/test/snapshot_manager_test.cc | 437 ++++++++++++++++++++++++++++++
 src/iceberg/test/update_test_base.h       |  80 +++++-
 src/iceberg/transaction.cc                |  26 +-
 src/iceberg/transaction.h                 |  11 +-
 src/iceberg/type_fwd.h                    |   1 +
 src/iceberg/update/meson.build            |   1 +
 src/iceberg/update/snapshot_manager.cc    | 209 ++++++++++++++
 src/iceberg/update/snapshot_manager.h     | 203 ++++++++++++++
 src/iceberg/update/snapshot_update.cc     |  14 -
 src/iceberg/update/snapshot_update.h      |  23 +-
 17 files changed, 982 insertions(+), 198 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index ce995cd4..21e87bee 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -90,6 +90,7 @@ set(ICEBERG_SOURCES
     update/fast_append.cc
     update/pending_update.cc
     update/set_snapshot.cc
+    update/snapshot_manager.cc
     update/snapshot_update.cc
     update/update_location.cc
     update/update_partition_spec.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 6c8adcf7..bfc502fd 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -108,6 +108,7 @@ iceberg_sources = files(
     'update/fast_append.cc',
     'update/pending_update.cc',
     'update/set_snapshot.cc',
+    'update/snapshot_manager.cc',
     'update/snapshot_update.cc',
     'update/update_location.cc',
     'update/update_partition_spec.cc',
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index b6c26ea0..0550f61d 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -32,6 +32,7 @@
 #include "iceberg/table_scan.h"
 #include "iceberg/transaction.h"
 #include "iceberg/update/expire_snapshots.h"
+#include "iceberg/update/snapshot_manager.h"
 #include "iceberg/update/update_partition_spec.h"
 #include "iceberg/update/update_partition_statistics.h"
 #include "iceberg/update/update_properties.h"
@@ -222,6 +223,10 @@ Result<std::shared_ptr<UpdatePartitionStatistics>> 
Table::NewUpdatePartitionStat
   return transaction->NewUpdatePartitionStatistics();
 }
 
+Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
+  return SnapshotManager::Make(shared_from_this());
+}
+
 Result<std::shared_ptr<StagedTable>> StagedTable::Make(
     TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
     std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 1f3135dd..423911c2 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -168,6 +168,9 @@ class ICEBERG_EXPORT Table : public 
std::enable_shared_from_this<Table> {
   /// \brief Create a new FastAppend to append data files and commit the 
changes.
   virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();
 
+  /// \brief Create a new SnapshotManager to manage snapshots and snapshot 
references.
+  virtual Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();
+
  protected:
   Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
         std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 28e7cb19..fdd88888 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -179,7 +179,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    SOURCES
                    expire_snapshots_test.cc
                    fast_append_test.cc
-                   set_snapshot_test.cc
+                   snapshot_manager_test.cc
                    transaction_test.cc
                    update_location_test.cc
                    update_partition_spec_test.cc
diff --git a/src/iceberg/test/fast_append_test.cc 
b/src/iceberg/test/fast_append_test.cc
index 7c79d5e9..6c77fad1 100644
--- a/src/iceberg/test/fast_append_test.cc
+++ b/src/iceberg/test/fast_append_test.cc
@@ -39,10 +39,12 @@ class FastAppendTest : public UpdateTestBase {
  protected:
   static void SetUpTestSuite() { avro::RegisterAll(); }
 
+  std::string MetadataResource() const override {
+    return "TableMetadataV2ValidMinimal.json";
+  }
+
   void SetUp() override {
-    InitializeFileIO();
-    // Use minimal metadata for FastAppend tests
-    RegisterTableFromResource("TableMetadataV2ValidMinimal.json");
+    UpdateTestBase::SetUp();
 
     // Get partition spec and schema from the base table
     ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
diff --git a/src/iceberg/test/set_snapshot_test.cc 
b/src/iceberg/test/set_snapshot_test.cc
deleted file mode 100644
index 6bbd59b8..00000000
--- a/src/iceberg/test/set_snapshot_test.cc
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "iceberg/update/set_snapshot.h"
-
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include "iceberg/result.h"
-#include "iceberg/snapshot.h"
-#include "iceberg/test/matchers.h"
-#include "iceberg/test/update_test_base.h"
-#include "iceberg/transaction.h"
-
-namespace iceberg {
-
-class SetSnapshotTest : public UpdateTestBase {
- protected:
-  // Snapshot IDs from TableMetadataV2Valid.json
-  static constexpr int64_t kOldestSnapshotId = 3051729675574597004;
-  static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;
-
-  // Timestamps from TableMetadataV2Valid.json
-  static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770;
-  static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770;
-};
-
-TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
-
-  set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
-
-  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
-  EXPECT_EQ(snapshot_id, kOldestSnapshotId);
-
-  // Commit and verify the change was persisted
-  EXPECT_THAT(set_snapshot->Commit(), IsOk());
-  EXPECT_THAT(transaction->Commit(), IsOk());
-  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
-  ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
-  EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);
-
-  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
-  EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  // Try to set to a non-existent snapshot
-  int64_t invalid_snapshot_id = 9999999999999999;
-  set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);
-
-  auto result = set_snapshot->Apply();
-  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
-  EXPECT_THAT(result, HasErrorMessage("is not found"));
-}
-
-TEST_F(SetSnapshotTest, RollbackToValid) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  // Rollback to the oldest snapshot (which is an ancestor)
-  set_snapshot->RollbackTo(kOldestSnapshotId);
-
-  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
-  EXPECT_EQ(snapshot_id, kOldestSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  // Try to rollback to a non-existent snapshot
-  int64_t invalid_snapshot_id = 9999999999999999;
-  set_snapshot->RollbackTo(invalid_snapshot_id);
-
-  auto result = set_snapshot->Apply();
-  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
-  EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
-}
-
-TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  // Rollback to a time between the two snapshots
-  // This should select the oldest snapshot
-  int64_t time_between = (kOldestSnapshotTimestamp + 
kCurrentSnapshotTimestamp) / 2;
-  set_snapshot->RollbackToTime(time_between);
-
-  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
-  EXPECT_EQ(snapshot_id, kOldestSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  // Try to rollback to a time before any snapshot
-  int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
-  set_snapshot->RollbackToTime(time_before_all);
-
-  // Should fail - no snapshot older than the specified time
-  auto result = set_snapshot->Apply();
-  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
-  EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than"));
-}
-
-TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  // Rollback to a timestamp just after the oldest snapshot
-  int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
-  set_snapshot->RollbackToTime(time_just_after_oldest);
-
-  // Apply and verify - should return the oldest snapshot
-  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
-  EXPECT_EQ(snapshot_id, kOldestSnapshotId);
-}
-
-TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
-  ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
-  ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
-  ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
-  EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
-
-  EXPECT_THAT(set_snapshot->Commit(), IsOk());
-  EXPECT_THAT(transaction->Commit(), IsOk());
-  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
-  ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
-  EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
-}
-
-}  // namespace iceberg
diff --git a/src/iceberg/test/snapshot_manager_test.cc 
b/src/iceberg/test/snapshot_manager_test.cc
new file mode 100644
index 00000000..bc00db39
--- /dev/null
+++ b/src/iceberg/test/snapshot_manager_test.cc
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/snapshot_manager.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/snapshot.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/transaction.h"
+
+namespace iceberg {
+
+class SnapshotManagerTest : public UpdateTestBase {
+ protected:
+  void SetUp() override {
+    UpdateTestBase::SetUp();
+    ICEBERG_UNWRAP_OR_FAIL(auto current, table_->current_snapshot());
+    current_snapshot_id_ = current->snapshot_id;
+    ASSERT_FALSE(table_->snapshots().empty());
+    oldest_snapshot_id_ = table_->snapshots().front()->snapshot_id;
+  }
+
+  int64_t current_snapshot_id_{};
+  int64_t oldest_snapshot_id_{};
+};
+
+TEST_F(SnapshotManagerTest, CreateBranch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+  ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, CreateBranchWithoutSnapshotId) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1");
+  ExpectCommitOk(manager->Commit());
+  ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, CreateBranchFailsWhenRefAlreadyExists) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, table_->NewSnapshotManager());
+  new_manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitError(new_manager->Commit(), ErrorKind::kCommitFailed,
+                    "branch 'branch1' was created concurrently");
+}
+
+TEST_F(SnapshotManagerTest, CreateTag) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateTag("tag1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+  ExpectTag("tag1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, CreateTagFailsWhenRefAlreadyExists) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateTag("tag1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, table_->NewSnapshotManager());
+  new_manager->CreateTag("tag1", current_snapshot_id_);
+  ExpectCommitError(new_manager->Commit(), ErrorKind::kCommitFailed,
+                    "tag 'tag1' was created concurrently");
+}
+
+TEST_F(SnapshotManagerTest, RemoveBranch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->RemoveBranch("branch1");
+  ExpectCommitOk(new_manager->Commit());
+  ExpectNoRef("branch1");
+}
+
+TEST_F(SnapshotManagerTest, RemovingNonExistingBranchFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->RemoveBranch("non-existing");
+  ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+                    "Branch does not exist: non-existing");
+}
+
+TEST_F(SnapshotManagerTest, RemovingMainBranchFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->RemoveBranch(std::string(SnapshotRef::kMainBranch));
+  ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+                    "Cannot remove main branch");
+}
+
+TEST_F(SnapshotManagerTest, RemoveTag) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateTag("tag1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->RemoveTag("tag1");
+  ExpectCommitOk(new_manager->Commit());
+  ExpectNoRef("tag1");
+}
+
+TEST_F(SnapshotManagerTest, RemovingNonExistingTagFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->RemoveTag("non-existing");
+  ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+                    "Tag does not exist: non-existing");
+}
+
+TEST_F(SnapshotManagerTest, ReplaceBranch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", oldest_snapshot_id_);
+  manager->CreateBranch("branch2", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->ReplaceBranch("branch1", "branch2");
+  ExpectCommitOk(new_manager->Commit());
+  ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingToBranchFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->ReplaceBranch("branch1", "non-existing");
+  ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+                    "Ref does not exist: non-existing");
+}
+
+TEST_F(SnapshotManagerTest, 
ReplaceBranchNonExistingFromBranchCreatesTheBranch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->ReplaceBranch("new-branch", "branch1");
+  ExpectCommitOk(new_manager->Commit());
+  ExpectBranch("new-branch", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, 
FastForwardBranchNonExistingFromBranchCreatesTheBranch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->FastForwardBranch("new-branch", "branch1");
+  ExpectCommitOk(new_manager->Commit());
+  ExpectBranch("new-branch", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingToFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->FastForwardBranch("branch1", "non-existing");
+  ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+                    "Ref does not exist: non-existing");
+}
+
+TEST_F(SnapshotManagerTest, ReplaceTag) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateTag("tag1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->ReplaceTag("tag1", current_snapshot_id_);
+  ExpectCommitOk(new_manager->Commit());
+  ExpectTag("tag1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, UpdatingBranchRetention) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+    ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+    new_manager->SetMinSnapshotsToKeep("branch1", 10);
+    new_manager->SetMaxSnapshotAgeMs("branch1", 20000);
+    ExpectCommitOk(new_manager->Commit());
+  }
+
+  auto metadata = ReloadMetadata();
+  auto ref = metadata->refs.at("branch1");
+  EXPECT_EQ(ref->type(), SnapshotRefType::kBranch);
+  const auto& branch = std::get<SnapshotRef::Branch>(ref->retention);
+  EXPECT_EQ(branch.max_snapshot_age_ms, 20000);
+  EXPECT_EQ(branch.min_snapshots_to_keep, 10);
+}
+
+TEST_F(SnapshotManagerTest, SettingBranchRetentionOnTagFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateTag("tag1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+    ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+    new_manager->SetMinSnapshotsToKeep("tag1", 10);
+    ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+                      "Ref 'tag1' is a tag not a branch");
+  }
+
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+    ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+    new_manager->SetMaxSnapshotAgeMs("tag1", 10);
+    ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+                      "Ref 'tag1' is a tag not a branch");
+  }
+}
+
+TEST_F(SnapshotManagerTest, UpdatingBranchMaxRefAge) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->SetMaxRefAgeMs("branch1", 10000);
+  ExpectCommitOk(new_manager->Commit());
+
+  EXPECT_EQ(ReloadMetadata()->refs.at("branch1")->max_ref_age_ms(), 10000);
+}
+
+TEST_F(SnapshotManagerTest, UpdatingTagMaxRefAge) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateTag("tag1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->SetMaxRefAgeMs("tag1", 10000);
+  ExpectCommitOk(new_manager->Commit());
+
+  EXPECT_EQ(ReloadMetadata()->refs.at("tag1")->max_ref_age_ms(), 10000);
+}
+
+TEST_F(SnapshotManagerTest, RenameBranch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->RenameBranch("branch1", "branch2");
+  ExpectCommitOk(new_manager->Commit());
+
+  ExpectNoRef("branch1");
+  ExpectBranch("branch2", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, FailRenamingMainBranch) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->RenameBranch(std::string(SnapshotRef::kMainBranch), "some-branch");
+  ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+                    "Cannot rename main branch");
+}
+
+TEST_F(SnapshotManagerTest, RenamingNonExistingBranchFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->RenameBranch("some-missing-branch", "some-branch");
+  ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+                    "Branch does not exist: some-missing-branch");
+}
+
+TEST_F(SnapshotManagerTest, RollbackToTime) {
+  // The oldest snapshot has timestamp 1515100955770, the current has 
1555100955770.
+  // Rolling back to a time between them should land on the oldest snapshot.
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->RollbackToTime(1535100955770);
+  ExpectCommitOk(manager->Commit());
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, RollbackToTimeBeforeAllSnapshotsFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->RollbackToTime(1000);
+  ExpectCommitError(manager->Commit(), ErrorKind::kValidationFailed,
+                    "no valid snapshot older than");
+}
+
+TEST_F(SnapshotManagerTest, ReplaceBranchBySnapshotId) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", oldest_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->ReplaceBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(new_manager->Commit());
+  ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, RefUpdatesFlushedBeforeSnapshotOperation) {
+  // Interleave a ref operation (CreateBranch) with a snapshot operation
+  // (SetCurrentSnapshot). The ref updates should be flushed before the 
snapshot
+  // operation is applied, so both should take effect.
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  manager->SetCurrentSnapshot(oldest_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+  ExpectBranch("branch1", current_snapshot_id_);
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, SnapshotOperationThenRefUpdates) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->SetCurrentSnapshot(oldest_snapshot_id_);
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+  ExpectBranch("branch1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, RollbackTo) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->RollbackTo(oldest_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, SetCurrentSnapshot) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->SetCurrentSnapshot(oldest_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, CreateReferencesAndRollback) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1", current_snapshot_id_);
+  manager->CreateTag("tag1", current_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, reloaded->NewSnapshotManager());
+  new_manager->RollbackTo(oldest_snapshot_id_);
+  ExpectCommitOk(new_manager->Commit());
+
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+  ExpectBranch("branch1", current_snapshot_id_);
+  ExpectTag("tag1", current_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, SnapshotManagerThroughTransaction) {
+  ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn));
+  manager->RollbackTo(oldest_snapshot_id_);
+  ExpectCommitOk(txn->Commit());
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest, 
SnapshotManagerFromTableAllowsMultipleSnapshotOperations) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->SetCurrentSnapshot(oldest_snapshot_id_);
+  manager->SetCurrentSnapshot(current_snapshot_id_);
+  manager->RollbackTo(oldest_snapshot_id_);
+  ExpectCommitOk(manager->Commit());
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+TEST_F(SnapshotManagerTest,
+       SnapshotManagerFromTransactionAllowsMultipleSnapshotOperations) {
+  ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn));
+  manager->SetCurrentSnapshot(oldest_snapshot_id_);
+  manager->SetCurrentSnapshot(current_snapshot_id_);
+  manager->RollbackTo(oldest_snapshot_id_);
+  ExpectCommitOk(txn->Commit());
+  ExpectCurrentSnapshot(oldest_snapshot_id_);
+}
+
+class SnapshotManagerMinimalTableTest : public MinimalUpdateTestBase {};
+
+TEST_F(SnapshotManagerMinimalTableTest, CreateBranchOnEmptyTable) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1");
+  ExpectCommitOk(manager->Commit());
+
+  auto metadata = ReloadMetadata();
+  EXPECT_FALSE(metadata->refs.contains(std::string(SnapshotRef::kMainBranch)));
+  auto it = metadata->refs.find("branch1");
+  ASSERT_NE(it, metadata->refs.end());
+  EXPECT_EQ(it->second->type(), SnapshotRefType::kBranch);
+}
+
+TEST_F(SnapshotManagerMinimalTableTest,
+       CreateBranchOnEmptyTableFailsWhenRefAlreadyExists) {
+  ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
+  manager->CreateBranch("branch1");
+  ExpectCommitOk(manager->Commit());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto table_with_branch, 
catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_manager, 
table_with_branch->NewSnapshotManager());
+  new_manager->CreateBranch("branch1");
+  ExpectCommitError(new_manager->Commit(), ErrorKind::kValidationFailed,
+                    "Ref branch1 already exists");
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/update_test_base.h 
b/src/iceberg/test/update_test_base.h
index c14cb76b..310feb37 100644
--- a/src/iceberg/test/update_test_base.h
+++ b/src/iceberg/test/update_test_base.h
@@ -28,6 +28,8 @@
 
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
 #include "iceberg/catalog/memory/in_memory_catalog.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
 #include "iceberg/table.h"
 #include "iceberg/table_identifier.h"
 #include "iceberg/table_metadata.h"
@@ -37,12 +39,18 @@
 
 namespace iceberg {
 
-// Base test fixture for table update operations
+/// \brief Base test fixture for table update operations.
 class UpdateTestBase : public ::testing::Test {
  protected:
+  virtual std::string MetadataResource() const { return 
"TableMetadataV2Valid.json"; }
+  virtual std::string TableName() const { return "test_table"; }
+
   void SetUp() override {
+    table_ident_ = TableIdentifier{.name = TableName()};
+    table_location_ = "/warehouse/" + TableName();
+
     InitializeFileIO();
-    RegisterTableFromResource("TableMetadataV2Valid.json");
+    RegisterTableFromResource(MetadataResource());
   }
 
   /// \brief Initialize file IO and create necessary directories.
@@ -62,10 +70,8 @@ class UpdateTestBase : public ::testing::Test {
   ///
   /// \param resource_name The name of the metadata resource file
   void RegisterTableFromResource(const std::string& resource_name) {
-    // Drop existing table if it exists
     std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false);
 
-    // Write table metadata to the table location.
     auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
                                          table_location_, 
Uuid::GenerateV7().ToString());
     ICEBERG_UNWRAP_OR_FAIL(auto metadata, 
ReadTableMetadataFromResource(resource_name));
@@ -73,16 +79,76 @@ class UpdateTestBase : public ::testing::Test {
     ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, 
*metadata),
                 IsOk());
 
-    // Register the table in the catalog.
     ICEBERG_UNWRAP_OR_FAIL(table_,
                            catalog_->RegisterTable(table_ident_, 
metadata_location));
   }
 
-  const TableIdentifier table_ident_{.name = "test_table"};
-  const std::string table_location_{"/warehouse/test_table"};
+  /// \brief Reload the table from catalog and return its metadata.
+  std::shared_ptr<TableMetadata> ReloadMetadata() {
+    auto result = catalog_->LoadTable(table_ident_);
+    EXPECT_TRUE(result.has_value()) << "Failed to reload table";
+    return result.value()->metadata();
+  }
+
+  /// \brief Assert that a ref exists with the given type and snapshot id.
+  void ExpectRef(const std::string& name, SnapshotRefType type, int64_t 
snapshot_id) {
+    auto metadata = ReloadMetadata();
+    auto it = metadata->refs.find(name);
+    ASSERT_NE(it, metadata->refs.end()) << "Ref not found: " << name;
+    EXPECT_EQ(it->second->type(), type);
+    EXPECT_EQ(it->second->snapshot_id, snapshot_id);
+  }
+
+  void ExpectBranch(const std::string& name, int64_t snapshot_id) {
+    ExpectRef(name, SnapshotRefType::kBranch, snapshot_id);
+  }
+
+  void ExpectTag(const std::string& name, int64_t snapshot_id) {
+    ExpectRef(name, SnapshotRefType::kTag, snapshot_id);
+  }
+
+  /// \brief Assert that a ref does not exist.
+  void ExpectNoRef(const std::string& name) {
+    auto metadata = ReloadMetadata();
+    EXPECT_FALSE(metadata->refs.contains(name)) << "Ref should not exist: " << 
name;
+  }
+
+  /// \brief Assert the current snapshot id after reloading.
+  void ExpectCurrentSnapshot(int64_t snapshot_id) {
+    auto result = catalog_->LoadTable(table_ident_);
+    ASSERT_TRUE(result.has_value());
+    auto snap_result = result.value()->current_snapshot();
+    ASSERT_TRUE(snap_result.has_value());
+    EXPECT_EQ(snap_result.value()->snapshot_id, snapshot_id);
+  }
+
+  /// \brief Assert that a commit succeeded.
+  template <typename T>
+  void ExpectCommitOk(const T& result) {
+    EXPECT_THAT(result, IsOk());
+  }
+
+  /// \brief Assert that a commit failed with the given error kind and message 
substring.
+  template <typename T>
+  void ExpectCommitError(const T& result, ErrorKind kind, const std::string& 
message) {
+    EXPECT_THAT(result, IsError(kind));
+    EXPECT_THAT(result, HasErrorMessage(message));
+  }
+
+  TableIdentifier table_ident_;
+  std::string table_location_;
   std::shared_ptr<FileIO> file_io_;
   std::shared_ptr<InMemoryCatalog> catalog_;
   std::shared_ptr<Table> table_;
 };
 
+/// \brief Test fixture for table update operations on minimal table metadata.
+class MinimalUpdateTestBase : public UpdateTestBase {
+ protected:
+  std::string MetadataResource() const override {
+    return "TableMetadataV2ValidMinimal.json";
+  }
+  std::string TableName() const override { return "minimal_table"; }
+};
+
 }  // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index b24aa0da..6df45b30 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -36,6 +36,7 @@
 #include "iceberg/update/fast_append.h"
 #include "iceberg/update/pending_update.h"
 #include "iceberg/update/set_snapshot.h"
+#include "iceberg/update/snapshot_manager.h"
 #include "iceberg/update/snapshot_update.h"
 #include "iceberg/update/update_location.h"
 #include "iceberg/update/update_partition_spec.h"
@@ -62,9 +63,7 @@ Transaction::~Transaction() = default;
 
 Result<std::shared_ptr<Transaction>> Transaction::Make(std::shared_ptr<Table> 
table,
                                                        Kind kind, bool 
auto_commit) {
-  if (!table || !table->catalog()) [[unlikely]] {
-    return InvalidArgument("Table and catalog cannot be null");
-  }
+  ICEBERG_PRECHECK(table && table->catalog(), "Table and catalog cannot be 
null");
 
   std::unique_ptr<TableMetadataBuilder> metadata_builder;
   if (kind == Kind::kCreate) {
@@ -93,9 +92,9 @@ std::string 
Transaction::MetadataFileLocation(std::string_view filename) const {
 }
 
 Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
-  if (!last_update_committed_) {
-    return InvalidArgument("Cannot add update when previous update is not 
committed");
-  }
+  ICEBERG_CHECK(last_update_committed_,
+                "Cannot add update when previous update is not committed");
+
   pending_updates_.emplace_back(std::weak_ptr<PendingUpdate>(update));
   last_update_committed_ = false;
   return {};
@@ -301,13 +300,9 @@ Status 
Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& up
 }
 
 Result<std::shared_ptr<Table>> Transaction::Commit() {
-  if (committed_) {
-    return Invalid("Transaction already committed");
-  }
-  if (!last_update_committed_) {
-    return InvalidArgument(
-        "Cannot commit transaction when previous update is not committed");
-  }
+  ICEBERG_CHECK(!committed_, "Transaction already committed");
+  ICEBERG_CHECK(last_update_committed_,
+                "Cannot commit transaction when previous update is not 
committed");
 
   const auto& updates = metadata_builder_->changes();
   if (updates.empty()) {
@@ -428,4 +423,9 @@ Transaction::NewUpdateSnapshotReference() {
   return update_ref;
 }
 
+Result<std::shared_ptr<SnapshotManager>> Transaction::NewSnapshotManager() {
+  // SnapshotManager has its own commit logic, so it is not added to the 
pending updates.
+  return SnapshotManager::Make(shared_from_this());
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index e975be7f..438054b5 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -94,13 +94,16 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// changes.
   Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
 
+  /// \brief Create a new FastAppend to append data files and commit the 
changes.
+  Result<std::shared_ptr<FastAppend>> NewFastAppend();
+
+  /// \brief Create a new SnapshotManager to manage snapshots.
+  Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();
+
   /// \brief Create a new SetSnapshot to set the current snapshot or rollback 
to a
   /// previous snapshot and commit the changes.
   Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
 
-  /// \brief Create a new FastAppend to append data files and commit the 
changes.
-  Result<std::shared_ptr<FastAppend>> NewFastAppend();
-
   /// \brief Create a new UpdateSnapshotReference to update snapshot 
references (branches
   /// and tags) and commit the changes.
   Result<std::shared_ptr<UpdateSnapshotReference>> 
NewUpdateSnapshotReference();
@@ -136,7 +139,7 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   const Kind kind_;
   // Whether to auto-commit the transaction when updates are applied.
   // This is useful when a temporary transaction is created for a single 
operation.
-  const bool auto_commit_;
+  bool auto_commit_;
   // To make the state simple, we require updates are added and committed in 
order.
   bool last_update_committed_ = true;
   // Tracks if transaction has been committed to prevent double-commit
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index e97de0ac..7a3f50df 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -193,6 +193,7 @@ class ExpireSnapshots;
 class FastAppend;
 class PendingUpdate;
 class SetSnapshot;
+class SnapshotManager;
 class SnapshotUpdate;
 class UpdateLocation;
 class UpdatePartitionSpec;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index 102471c0..6acb007a 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -21,6 +21,7 @@ install_headers(
         'fast_append.h',
         'pending_update.h',
         'set_snapshot.h',
+        'snapshot_manager.h',
         'snapshot_update.h',
         'update_location.h',
         'update_partition_spec.h',
diff --git a/src/iceberg/update/snapshot_manager.cc 
b/src/iceberg/update/snapshot_manager.cc
new file mode 100644
index 00000000..d882dd32
--- /dev/null
+++ b/src/iceberg/update/snapshot_manager.cc
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/snapshot_manager.h"
+
+#include "iceberg/result.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/update/set_snapshot.h"
+#include "iceberg/update/update_snapshot_reference.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
+    std::shared_ptr<Table> table) {
+  ICEBERG_PRECHECK(table != nullptr, "Invalid input table: null");
+  ICEBERG_ASSIGN_OR_RAISE(auto transaction,
+                          Transaction::Make(std::move(table), 
Transaction::Kind::kUpdate,
+                                            /*auto_commit=*/false));
+  return std::shared_ptr<SnapshotManager>(
+      new SnapshotManager(std::move(transaction), 
/*is_external_transaction=*/false));
+}
+
+Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
+    std::shared_ptr<Transaction> transaction) {
+  ICEBERG_PRECHECK(transaction != nullptr, "Invalid input transaction: null");
+  return std::shared_ptr<SnapshotManager>(
+      new SnapshotManager(std::move(transaction), 
/*is_external_transaction=*/true));
+}
+
+SnapshotManager::SnapshotManager(std::shared_ptr<Transaction> transaction,
+                                 bool is_external_transaction)
+    : transaction_(std::move(transaction)),
+      is_external_transaction_(is_external_transaction) {}
+
+SnapshotManager::~SnapshotManager() = default;
+
+SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) {
+  ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
+  // TODO(anyone): Implement cherrypick operation
+  ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented");
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) {
+  ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, 
transaction_->NewSetSnapshot());
+  set_snapshot->SetCurrentSnapshot(snapshot_id);
+  ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::RollbackToTime(int64_t timestamp_ms) {
+  ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, 
transaction_->NewSetSnapshot());
+  set_snapshot->RollbackToTime(timestamp_ms);
+  ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) {
+  ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist());
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, 
transaction_->NewSetSnapshot());
+  set_snapshot->RollbackTo(snapshot_id);
+  ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit());
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) {
+  const auto& base = transaction_->current();
+  if (base.current_snapshot_id != kInvalidSnapshotId) {
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base.Snapshot());
+    ICEBERG_DCHECK(current_snapshot != nullptr, "Current snapshot should not 
be null");
+    return CreateBranch(name, current_snapshot->snapshot_id);
+  }
+  ICEBERG_BUILDER_CHECK(!base.refs.contains(name), "Ref {} already exists", 
name);
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto fast_append, 
transaction_->NewFastAppend());
+  ICEBERG_BUILDER_RETURN_IF_ERROR(fast_append->SetTargetBranch(name).Commit());
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::CreateBranch(const std::string& name,
+                                               int64_t snapshot_id) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->CreateBranch(name, snapshot_id);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::CreateTag(const std::string& name,
+                                            int64_t snapshot_id) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->CreateTag(name, snapshot_id);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::RemoveBranch(const std::string& name) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->RemoveBranch(name);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::RemoveTag(const std::string& name) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->RemoveTag(name);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::ReplaceTag(const std::string& name,
+                                             int64_t snapshot_id) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->ReplaceTag(name, snapshot_id);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& name,
+                                                int64_t snapshot_id) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->ReplaceBranch(name, snapshot_id);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& from,
+                                                const std::string& to) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->ReplaceBranch(from, to);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::FastForwardBranch(const std::string& from,
+                                                    const std::string& to) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->FastForward(from, to);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::RenameBranch(const std::string& name,
+                                               const std::string& new_name) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->RenameBranch(name, new_name);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::SetMinSnapshotsToKeep(const std::string& 
branch_name,
+                                                        int32_t 
min_snapshots_to_keep) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->SetMinSnapshotsToKeep(branch_name, min_snapshots_to_keep);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::SetMaxSnapshotAgeMs(const std::string& 
branch_name,
+                                                      int64_t 
max_snapshot_age_ms) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->SetMaxSnapshotAgeMs(branch_name, max_snapshot_age_ms);
+  return *this;
+}
+
+SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name,
+                                                 int64_t max_ref_age_ms) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, 
UpdateSnapshotReferencesOperation());
+  update_ref->SetMaxRefAgeMs(name, max_ref_age_ms);
+  return *this;
+}
+
+Status SnapshotManager::Commit() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+  ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist());
+  if (!is_external_transaction_) {
+    ICEBERG_RETURN_UNEXPECTED(transaction_->Commit());
+  }
+  return {};
+}
+
+Result<std::shared_ptr<UpdateSnapshotReference>>
+SnapshotManager::UpdateSnapshotReferencesOperation() {
+  if (update_snap_refs_ == nullptr) {
+    ICEBERG_ASSIGN_OR_RAISE(update_snap_refs_,
+                            transaction_->NewUpdateSnapshotReference());
+  }
+  return update_snap_refs_;
+}
+
+Status SnapshotManager::CommitIfRefUpdatesExist() {
+  if (update_snap_refs_ != nullptr) {
+    ICEBERG_RETURN_UNEXPECTED(update_snap_refs_->Commit());
+    update_snap_refs_ = nullptr;
+  }
+  return {};
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/snapshot_manager.h 
b/src/iceberg/update/snapshot_manager.h
new file mode 100644
index 00000000..fd81f833
--- /dev/null
+++ b/src/iceberg/update/snapshot_manager.h
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
+
+/// \file iceberg/update/snapshot_manager.h
+/// \brief API for managing snapshots and snapshot references.
+
+namespace iceberg {
+
+/// \brief API for managing snapshots.
+///
+/// Allows rolling table data back to a stated at an older table snapshot.
+///
+/// Rollback: This API does not allow conflicting calls to SetCurrentSnapshot 
and
+/// RollbackToTime. When committing, these changes will be applied to the 
current table
+/// metadata. Commit conflicts will not be resolved and will result in a
+///
+/// Cherrypick: In an audit workflow, new data is written to an orphan 
snapshot that is
+/// not committed as the table's current state until it is audited. After 
auditing a
+/// change, it may need to be applied or cherry-picked on top of the latest 
snapshot
+/// instead of the one that was current when the audited changes were created. 
This class
+/// adds support for cherry-picking the changes from an orphan snapshot by 
applying them
+/// to the current snapshot. The output of the operation is a new snapshot 
with the
+/// changes from cherry-picked snapshot.
+class ICEBERG_EXPORT SnapshotManager : public ErrorCollector {
+ public:
+  /// \brief Create a SnapshotManager that owns its own transaction.
+  static Result<std::shared_ptr<SnapshotManager>> Make(std::shared_ptr<Table> 
table);
+
+  /// \brief Create a SnapshotManager from an existing transaction.
+  ///
+  /// \note The caller is responsible for committing the transaction.
+  static Result<std::shared_ptr<SnapshotManager>> Make(
+      std::shared_ptr<Transaction> transaction);
+
+  ~SnapshotManager() override;
+
+  /// \brief Apply supported changes in given snapshot and create a new 
snapshot which
+  /// will be set as the current snapshot on commit.
+  ///
+  /// \param snapshot_id a Snapshot ID whose changes to apply
+  /// \return Reference to this for method chaining
+  SnapshotManager& Cherrypick(int64_t snapshot_id);
+
+  /// \brief Roll this table's data back to a specific snapshot identified by 
id.
+  ///
+  /// \param snapshot_id long id of the snapshot to roll back table data to
+  /// \return Reference to this for method chaining
+  SnapshotManager& SetCurrentSnapshot(int64_t snapshot_id);
+
+  /// \brief Roll this table's data back to the last snapshot before the given 
timestamp.
+  ///
+  /// \param timestamp_ms a long timestamp in milliseconds
+  /// \return Reference to this for method chaining
+  SnapshotManager& RollbackToTime(int64_t timestamp_ms);
+
+  /// \brief Rollback table's state to a specific snapshot identified by id.
+  ///
+  /// \param snapshot_id long id of snapshot to roll back table to. Must be an 
ancestor
+  /// of the current snapshot
+  /// \return Reference to this for method chaining
+  SnapshotManager& RollbackTo(int64_t snapshot_id);
+
+  /// \brief Create a new branch. The branch will point to current snapshot if 
the
+  /// current snapshot is not NULL. Otherwise, the branch will point to a 
newly created
+  /// empty snapshot.
+  ///
+  /// \param name branch name
+  /// \return Reference to this for method chaining
+  SnapshotManager& CreateBranch(const std::string& name);
+
+  /// \brief Create a new branch pointing to the given snapshot id.
+  ///
+  /// \param name branch name
+  /// \param snapshot_id id of the snapshot which will be the head of the 
branch
+  /// \return Reference to this for method chaining
+  SnapshotManager& CreateBranch(const std::string& name, int64_t snapshot_id);
+
+  /// \brief Create a new tag pointing to the given snapshot id.
+  ///
+  /// \param name tag name
+  /// \param snapshot_id snapshot id for the head of the new tag
+  /// \return Reference to this for method chaining
+  SnapshotManager& CreateTag(const std::string& name, int64_t snapshot_id);
+
+  /// \brief Remove a branch by name.
+  ///
+  /// \param name branch name
+  /// \return Reference to this for method chaining
+  SnapshotManager& RemoveBranch(const std::string& name);
+
+  /// \brief Remove the tag with the given name.
+  ///
+  /// \param name tag name
+  /// \return Reference to this for method chaining
+  SnapshotManager& RemoveTag(const std::string& name);
+
+  /// \brief Replace the tag with the given name to point to the specified 
snapshot.
+  ///
+  /// \param name tag to replace
+  /// \param snapshot_id new snapshot id for the given tag
+  /// \return Reference to this for method chaining
+  SnapshotManager& ReplaceTag(const std::string& name, int64_t snapshot_id);
+
+  /// \brief Replace the branch with the given name to point to the specified 
snapshot.
+  ///
+  /// \param name branch to replace
+  /// \param snapshot_id new snapshot id for the given branch
+  /// \return Reference to this for method chaining
+  SnapshotManager& ReplaceBranch(const std::string& name, int64_t snapshot_id);
+
+  /// \brief Replace the 'from' branch to point to the 'to' snapshot. The 'to' 
will
+  /// remain unchanged, and 'from' branch will retain its retention 
properties. If the
+  /// 'from' branch does not exist, it will be created with default retention 
properties.
+  ///
+  /// \param from branch to replace
+  /// \param to the branch 'from' should be replaced with
+  /// \return Reference to this for method chaining
+  SnapshotManager& ReplaceBranch(const std::string& from, const std::string& 
to);
+
+  /// \brief Perform a fast-forward of 'from' up to the 'to' snapshot if 
'from' is an
+  /// ancestor of 'to'. The 'to' will remain unchanged, and 'from' will retain 
its
+  /// retention properties. If the 'from' branch does not exist, it will be 
created with
+  /// default retention properties.
+  ///
+  /// \param from branch to fast-forward
+  /// \param to ref for the 'from' branch to be fast forwarded to
+  /// \return Reference to this for method chaining
+  SnapshotManager& FastForwardBranch(const std::string& from, const 
std::string& to);
+
+  /// \brief Rename a branch.
+  ///
+  /// \param name name of branch to rename
+  /// \param new_name the desired new name of the branch
+  /// \return Reference to this for method chaining
+  SnapshotManager& RenameBranch(const std::string& name, const std::string& 
new_name);
+
+  /// \brief Update the minimum number of snapshots to keep for a branch.
+  ///
+  /// \param branch_name branch name
+  /// \param min_snapshots_to_keep minimum number of snapshots to retain on 
the branch
+  /// \return Reference to this for method chaining
+  SnapshotManager& SetMinSnapshotsToKeep(const std::string& branch_name,
+                                         int32_t min_snapshots_to_keep);
+
+  /// \brief Update the max snapshot age for a branch.
+  ///
+  /// \param branch_name branch name
+  /// \param max_snapshot_age_ms maximum snapshot age in milliseconds to 
retain on branch
+  /// \return Reference to this for method chaining
+  SnapshotManager& SetMaxSnapshotAgeMs(const std::string& branch_name,
+                                       int64_t max_snapshot_age_ms);
+
+  /// \brief Update the retention policy for a reference.
+  ///
+  /// \param name branch name
+  /// \param max_ref_age_ms retention age in milliseconds of the tag reference 
itself
+  /// \return Reference to this for method chaining
+  SnapshotManager& SetMaxRefAgeMs(const std::string& name, int64_t 
max_ref_age_ms);
+
+  /// \brief Commit all pending changes.
+  Status Commit();
+
+ private:
+  SnapshotManager(std::shared_ptr<Transaction> transaction, bool 
is_external_transaction);
+
+  /// \brief Get or create the UpdateSnapshotReference operation.
+  Result<std::shared_ptr<UpdateSnapshotReference>> 
UpdateSnapshotReferencesOperation();
+
+  /// \brief Commit any pending reference updates if they exist.
+  Status CommitIfRefUpdatesExist();
+
+  std::shared_ptr<Transaction> transaction_;
+  const bool is_external_transaction_;
+  std::shared_ptr<UpdateSnapshotReference> update_snap_refs_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/snapshot_update.cc 
b/src/iceberg/update/snapshot_update.cc
index 4865278f..b4468256 100644
--- a/src/iceberg/update/snapshot_update.cc
+++ b/src/iceberg/update/snapshot_update.cc
@@ -331,20 +331,6 @@ Status SnapshotUpdate::Finalize(std::optional<Error> 
commit_error) {
   return {};
 }
 
-Status SnapshotUpdate::SetTargetBranch(const std::string& branch) {
-  ICEBERG_PRECHECK(!branch.empty(), "Branch name cannot be empty");
-
-  if (auto ref_it = base().refs.find(branch); ref_it != base().refs.end()) {
-    ICEBERG_PRECHECK(
-        ref_it->second->type() == SnapshotRefType::kBranch,
-        "{} is a tag, not a branch. Tags cannot be targets for producing 
snapshots",
-        branch);
-  }
-
-  target_branch_ = branch;
-  return {};
-}
-
 Result<std::unordered_map<std::string, std::string>> 
SnapshotUpdate::ComputeSummary(
     const TableMetadata& previous) {
   std::unordered_map<std::string, std::string> summary = Summary();
diff --git a/src/iceberg/update/snapshot_update.h 
b/src/iceberg/update/snapshot_update.h
index 12c3b19d..fdbb2660 100644
--- a/src/iceberg/update/snapshot_update.h
+++ b/src/iceberg/update/snapshot_update.h
@@ -76,6 +76,28 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
     return self;
   }
 
+  /// \brief Perform operations on a particular branch
+  ///
+  /// \param branch Which is name of SnapshotRef of type branch
+  /// \return Reference to this for method chaining
+  auto& SetTargetBranch(this auto& self, const std::string& branch) {
+    if (branch.empty()) [[unlikely]] {
+      return self.AddError(ErrorKind::kInvalidArgument, "Branch name cannot be 
empty");
+    }
+
+    if (auto ref_it = self.base().refs.find(branch); ref_it != 
self.base().refs.end()) {
+      if (ref_it->second->type() != SnapshotRefType::kBranch) {
+        return self.AddError(ErrorKind::kInvalidArgument,
+                             "{} is a tag, not a branch. Tags cannot be 
targets for "
+                             "producing snapshots",
+                             branch);
+      }
+    }
+
+    self.target_branch_ = branch;
+    return self;
+  }
+
   /// \brief Set a summary property.
   ///
   /// \param property The property name
@@ -121,7 +143,6 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
       std::span<const std::shared_ptr<DataFile>> files,
       const std::shared_ptr<PartitionSpec>& spec);
 
-  Status SetTargetBranch(const std::string& branch);
   const std::string& target_branch() const { return target_branch_; }
   bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; }
   const std::string& commit_uuid() const { return commit_uuid_; }

Reply via email to