This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b937acb6 feat: support expire snapshots (#490)
b937acb6 is described below

commit b937acb6812dca085e7b64a034e052607b92ef92
Author: dongxiao <[email protected]>
AuthorDate: Wed Jan 14 15:22:46 2026 +0800

    feat: support expire snapshots (#490)
    
    This PR is part of effort to implement expire snapshots described in
    the issue https://github.com/apache/iceberg-cpp/issues/364.
    
    TODO: File recycling will be added in a followup PR.
    
    ---------
    
    Co-authored-by: Gang Wu <[email protected]>
---
 src/iceberg/CMakeLists.txt                      |   1 +
 src/iceberg/meson.build                         |   1 +
 src/iceberg/snapshot.cc                         |  13 ++
 src/iceberg/snapshot.h                          |   2 +
 src/iceberg/table.cc                            |   8 +
 src/iceberg/table.h                             |   4 +
 src/iceberg/table_metadata.cc                   |  92 +++++++-
 src/iceberg/table_update.cc                     |   8 +-
 src/iceberg/test/CMakeLists.txt                 |   1 +
 src/iceberg/test/expire_snapshots_test.cc       |  68 ++++++
 src/iceberg/test/table_metadata_builder_test.cc | 131 ++++++++++-
 src/iceberg/transaction.cc                      |  27 +++
 src/iceberg/transaction.h                       |   4 +
 src/iceberg/type_fwd.h                          |   1 +
 src/iceberg/update/expire_snapshots.cc          | 292 ++++++++++++++++++++++++
 src/iceberg/update/expire_snapshots.h           | 174 ++++++++++++++
 src/iceberg/update/pending_update.h             |   1 +
 src/iceberg/util/snapshot_util.cc               |   8 +
 src/iceberg/util/snapshot_util_internal.h       |   9 +
 19 files changed, 835 insertions(+), 10 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 23d2e4cd..86a0efd7 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
     transform.cc
     transform_function.cc
     type.cc
+    update/expire_snapshots.cc
     update/pending_update.cc
     update/snapshot_update.cc
     update/update_partition_spec.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index ead2ef2c..87f508cd 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -102,6 +102,7 @@ iceberg_sources = files(
     'transform.cc',
     'transform_function.cc',
     'type.cc',
+    'update/expire_snapshots.cc',
     'update/pending_update.cc',
     'update/snapshot_update.cc',
     'update/update_partition_spec.cc',
diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc
index dfa9a340..e4edd7b6 100644
--- a/src/iceberg/snapshot.cc
+++ b/src/iceberg/snapshot.cc
@@ -52,6 +52,19 @@ SnapshotRefType SnapshotRef::type() const noexcept {
       retention);
 }
 
+std::optional<int64_t> SnapshotRef::max_ref_age_ms() const noexcept {
+  return std::visit(
+      [&](const auto& retention) -> std::optional<int64_t> {
+        using T = std::remove_cvref_t<decltype(retention)>;
+        if constexpr (std::is_same_v<T, Branch>) {
+          return retention.max_ref_age_ms;
+        } else {
+          return retention.max_ref_age_ms;
+        }
+      },
+      retention);
+}
+
 Status SnapshotRef::Validate() const {
   if (type() == SnapshotRefType::kBranch) {
     const auto& branch = std::get<Branch>(this->retention);
diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h
index e2ec0ccb..65bf2f83 100644
--- a/src/iceberg/snapshot.h
+++ b/src/iceberg/snapshot.h
@@ -113,6 +113,8 @@ struct ICEBERG_EXPORT SnapshotRef {
 
   SnapshotRefType type() const noexcept;
 
+  std::optional<int64_t> max_ref_age_ms() const noexcept;
+
   /// \brief Create a branch reference
   ///
   /// \param snapshot_id The snapshot ID for the branch
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index f5aacd82..c79ac53f 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -31,6 +31,7 @@
 #include "iceberg/table_properties.h"
 #include "iceberg/table_scan.h"
 #include "iceberg/transaction.h"
+#include "iceberg/update/expire_snapshots.h"
 #include "iceberg/update/update_partition_spec.h"
 #include "iceberg/update/update_properties.h"
 #include "iceberg/update/update_schema.h"
@@ -184,6 +185,13 @@ Result<std::shared_ptr<UpdateSchema>> 
Table::NewUpdateSchema() {
   return transaction->NewUpdateSchema();
 }
 
+Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
+                                          /*auto_commit=*/true));
+  return transaction->NewExpireSnapshots();
+}
+
 Result<std::shared_ptr<StagedTable>> StagedTable::Make(
     TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
     std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 7727f845..cc948248 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -147,6 +147,10 @@ class ICEBERG_EXPORT Table : public 
std::enable_shared_from_this<Table> {
   /// changes.
   virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
 
+  /// \brief Create a new ExpireSnapshots to remove expired snapshots and 
commit the
+  /// changes.
+  virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
+
  protected:
   Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
         std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 22eb739b..7e357bba 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -617,6 +617,9 @@ class TableMetadataBuilder::Impl {
   Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
   Status SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot, const 
std::string& branch);
   Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
+  Status RemoveRef(const std::string& name);
+  Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
+  Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
 
   Result<std::unique_ptr<TableMetadata>> Build();
 
@@ -1334,6 +1337,84 @@ int32_t 
TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
   return new_schema_id;
 }
 
+Status TableMetadataBuilder::Impl::RemoveRef(const std::string& name) {
+  if (name == SnapshotRef::kMainBranch) {
+    metadata_.current_snapshot_id = kInvalidSnapshotId;
+  }
+
+  if (metadata_.refs.erase(name) != 0) {
+    changes_.push_back(std::make_unique<table::RemoveSnapshotRef>(name));
+  }
+
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSnapshots(
+    const std::vector<int64_t>& snapshot_ids) {
+  if (snapshot_ids.empty()) {
+    return {};
+  }
+
+  std::unordered_set<int64_t> ids_to_remove(snapshot_ids.begin(), 
snapshot_ids.end());
+  std::vector<std::shared_ptr<Snapshot>> retained_snapshots;
+  retained_snapshots.reserve(metadata_.snapshots.size() - snapshot_ids.size());
+  std::vector<int64_t> snapshot_ids_to_remove;
+  snapshot_ids_to_remove.reserve(snapshot_ids.size());
+
+  for (auto& snapshot : metadata_.snapshots) {
+    ICEBERG_CHECK(snapshot != nullptr, "Encountered null snapshot in 
metadata");
+    const int64_t snapshot_id = snapshot->snapshot_id;
+    if (ids_to_remove.contains(snapshot_id)) {
+      snapshots_by_id_.erase(snapshot_id);
+      snapshot_ids_to_remove.push_back(snapshot_id);
+      // FIXME: implement statistics removal and uncomment below
+      // ICEBERG_RETURN_UNEXPECTED(RemoveStatistics(snapshot_id));
+      // ICEBERG_RETURN_UNEXPECTED(RemovePartitionStatistics(snapshot_id));
+    } else {
+      retained_snapshots.push_back(std::move(snapshot));
+    }
+  }
+
+  if (!snapshot_ids_to_remove.empty()) {
+    
changes_.push_back(std::make_unique<table::RemoveSnapshots>(snapshot_ids_to_remove));
+  }
+
+  metadata_.snapshots = std::move(retained_snapshots);
+
+  // Remove any refs that are no longer valid (dangling refs)
+  std::vector<std::string> dangling_refs;
+  for (const auto& [ref_name, ref] : metadata_.refs) {
+    if (!snapshots_by_id_.contains(ref->snapshot_id)) {
+      dangling_refs.push_back(ref_name);
+    }
+  }
+  for (const auto& ref_name : dangling_refs) {
+    ICEBERG_RETURN_UNEXPECTED(RemoveRef(ref_name));
+  }
+
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::RemovePartitionSpecs(
+    const std::vector<int32_t>& spec_ids) {
+  if (spec_ids.empty()) {
+    return {};
+  }
+
+  std::unordered_set<int32_t> spec_ids_to_remove(spec_ids.begin(), 
spec_ids.end());
+  ICEBERG_PRECHECK(!spec_ids_to_remove.contains(metadata_.default_spec_id),
+                   "Cannot remove the default partition spec");
+
+  metadata_.partition_specs =
+      metadata_.partition_specs | std::views::filter([&](const auto& spec) {
+        return !spec_ids_to_remove.contains(spec->spec_id());
+      }) |
+      std::ranges::to<std::vector<std::shared_ptr<PartitionSpec>>>();
+  changes_.push_back(std::make_unique<table::RemovePartitionSpecs>(spec_ids));
+
+  return {};
+}
+
 TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
     : impl_(std::make_unique<Impl>(format_version)) {}
 
@@ -1436,7 +1517,8 @@ TableMetadataBuilder& 
TableMetadataBuilder::AddPartitionSpec(
 
 TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
     const std::vector<int32_t>& spec_ids) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionSpecs(spec_ids));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(
@@ -1464,7 +1546,7 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
 
 TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
     std::shared_ptr<Snapshot> snapshot) {
-  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(snapshot));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(std::move(snapshot)));
   return *this;
 }
 
@@ -1487,7 +1569,8 @@ TableMetadataBuilder& TableMetadataBuilder::SetRef(const 
std::string& name,
 }
 
 TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) 
{
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveRef(name));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
@@ -1497,7 +1580,8 @@ TableMetadataBuilder& 
TableMetadataBuilder::RemoveSnapshots(
 
 TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
     const std::vector<int64_t>& snapshot_ids) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSnapshots(snapshot_ids));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() {
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 29388d47..7a01bdee 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -178,7 +178,7 @@ std::unique_ptr<TableUpdate> 
SetDefaultPartitionSpec::Clone() const {
 // RemovePartitionSpecs
 
 void RemovePartitionSpecs::ApplyTo(TableMetadataBuilder& builder) const {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  builder.RemovePartitionSpecs(spec_ids_);
 }
 
 void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) 
const {
@@ -301,7 +301,9 @@ std::unique_ptr<TableUpdate> AddSnapshot::Clone() const {
 
 // RemoveSnapshots
 
-void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {}
+void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {
+  builder.RemoveSnapshots(snapshot_ids_);
+}
 
 void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
   // RemoveSnapshots doesn't generate any requirements
@@ -322,7 +324,7 @@ std::unique_ptr<TableUpdate> RemoveSnapshots::Clone() const 
{
 // RemoveSnapshotRef
 
 void RemoveSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  builder.RemoveRef(ref_name_);
 }
 
 void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) 
const {
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 4f4516c7..1f6ab552 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -170,6 +170,7 @@ if(ICEBERG_BUILD_BUNDLE)
   add_iceberg_test(table_update_test
                    USE_BUNDLE
                    SOURCES
+                   expire_snapshots_test.cc
                    transaction_test.cc
                    update_partition_spec_test.cc
                    update_properties_test.cc
diff --git a/src/iceberg/test/expire_snapshots_test.cc 
b/src/iceberg/test/expire_snapshots_test.cc
new file mode 100644
index 00000000..dbc577a7
--- /dev/null
+++ b/src/iceberg/test/expire_snapshots_test.cc
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/expire_snapshots.h"
+
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+
+namespace iceberg {
+
+class ExpireSnapshotsTest : public UpdateTestBase {};
+
+TEST_F(ExpireSnapshotsTest, DefaultExpireByAge) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
+  EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
+}
+
+TEST_F(ExpireSnapshotsTest, KeepAll) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->RetainLast(2);
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
+  EXPECT_TRUE(result.refs_to_remove.empty());
+}
+
+TEST_F(ExpireSnapshotsTest, ExpireById) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->ExpireSnapshotId(3051729675574597004);
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
+  EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
+}
+
+TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
+  struct TestCase {
+    int64_t expire_older_than;
+    size_t expected_num_expired;
+  };
+  const std::vector<TestCase> test_cases = {
+      {.expire_older_than = 1515100955770 - 1, .expected_num_expired = 0},
+      {.expire_older_than = 1515100955770 + 1, .expected_num_expired = 1}};
+  for (const auto& test_case : test_cases) {
+    ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+    update->ExpireOlderThan(test_case.expire_older_than);
+    ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+    EXPECT_EQ(result.snapshot_ids_to_remove.size(), 
test_case.expected_num_expired);
+  }
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/table_metadata_builder_test.cc 
b/src/iceberg/test/table_metadata_builder_test.cc
index a0fbe3be..22df7430 100644
--- a/src/iceberg/test/table_metadata_builder_test.cc
+++ b/src/iceberg/test/table_metadata_builder_test.cc
@@ -61,7 +61,8 @@ Result<std::unique_ptr<Schema>> CreateDisorderedSchema() {
 }
 
 // Helper function to create base metadata for tests
-std::unique_ptr<TableMetadata> CreateBaseMetadata() {
+std::unique_ptr<TableMetadata> CreateBaseMetadata(
+    std::shared_ptr<PartitionSpec> spec = nullptr) {
   auto metadata = std::make_unique<TableMetadata>();
   metadata->format_version = 2;
   metadata->table_uuid = "test-uuid-1234";
@@ -71,8 +72,13 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata() {
   metadata->last_column_id = 3;
   metadata->current_schema_id = 0;
   metadata->schemas.push_back(CreateTestSchema());
-  metadata->partition_specs.push_back(PartitionSpec::Unpartitioned());
-  metadata->default_spec_id = PartitionSpec::kInitialSpecId;
+  if (spec == nullptr) {
+    metadata->partition_specs.push_back(PartitionSpec::Unpartitioned());
+    metadata->default_spec_id = PartitionSpec::kInitialSpecId;
+  } else {
+    metadata->default_spec_id = spec->spec_id();
+    metadata->partition_specs.push_back(std::move(spec));
+  }
   metadata->last_partition_id = 0;
   metadata->current_snapshot_id = kInvalidSnapshotId;
   metadata->default_sort_order_id = SortOrder::kUnsortedOrderId;
@@ -1127,4 +1133,123 @@ TEST(TableMetadataBuilderTest, 
RemoveSchemasAfterSchemaChange) {
   ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema: 
1"));
 }
 
+TEST(TableMetadataBuilderTest, RemoveSnapshotRef) {
+  auto base = CreateBaseMetadata();
+  auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+  // Add multiple snapshots
+  builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
+  builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
+
+  // Add multiple refs
+  ICEBERG_UNWRAP_OR_FAIL(auto ref1, SnapshotRef::MakeBranch(1));
+  ICEBERG_UNWRAP_OR_FAIL(auto ref2, SnapshotRef::MakeBranch(2));
+  builder->SetRef("ref1", std::move(ref1));
+  builder->SetRef("ref2", std::move(ref2));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+  ASSERT_EQ(metadata->refs.size(), 2);
+
+  // Remove one ref
+  builder = TableMetadataBuilder::BuildFrom(metadata.get());
+  builder->RemoveRef("ref2");
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->refs.size(), 1);
+  EXPECT_TRUE(metadata->refs.contains("ref1"));
+}
+
+TEST(TableMetadataBuilderTest, RemoveSnapshot) {
+  auto base = CreateBaseMetadata();
+  auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+  // Add multiple snapshots
+  builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
+  builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+  ASSERT_EQ(metadata->snapshots.size(), 2);
+
+  // Remove one snapshot
+  builder = TableMetadataBuilder::BuildFrom(metadata.get());
+  std::vector<int64_t> to_remove{2};
+  builder->RemoveSnapshots(to_remove);
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->snapshots.size(), 1);
+  ASSERT_THAT(metadata->SnapshotById(2), IsError(ErrorKind::kNotFound));
+}
+
+TEST(TableMetadataBuilderTest, RemoveSnapshotNotExist) {
+  auto base = CreateBaseMetadata();
+  auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+  // Add multiple snapshots
+  builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1}));
+  builder->AddSnapshot(std::make_shared<Snapshot>(Snapshot{.snapshot_id = 2}));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+  ASSERT_EQ(metadata->snapshots.size(), 2);
+
+  // Remove one snapshot
+  builder = TableMetadataBuilder::BuildFrom(metadata.get());
+  builder->RemoveSnapshots(std::vector<int64_t>{3});
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->snapshots.size(), 2);
+
+  builder = TableMetadataBuilder::BuildFrom(metadata.get());
+  builder->RemoveSnapshots(std::vector<int64_t>{1, 2});
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->snapshots.size(), 0);
+}
+
+TEST(TableMetadataBuilderTest, RemovePartitionSpec) {
+  // Add multiple specs
+  PartitionField field1(2, 4, "field1", Transform::Identity());
+  PartitionField field2(3, 5, "field2", Transform::Identity());
+  ICEBERG_UNWRAP_OR_FAIL(auto spec1, PartitionSpec::Make(1, {field1}));
+
+  auto base = CreateBaseMetadata(std::move(spec1));
+  auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto spec2, PartitionSpec::Make(2, {field1, field2}));
+  builder->AddPartitionSpec(std::move(spec2));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+  ASSERT_EQ(metadata->partition_specs.size(), 2);
+
+  // Remove one spec
+  builder = TableMetadataBuilder::BuildFrom(metadata.get());
+  builder->RemovePartitionSpecs({2});
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->partition_specs.size(), 1);
+  ASSERT_THAT(metadata->PartitionSpecById(2), IsError(ErrorKind::kNotFound));
+}
+
+TEST(TableMetadataBuilderTest, RemovePartitionSpecNotExist) {
+  // Add multiple specs
+  PartitionField field1(2, 4, "field1", Transform::Identity());
+  PartitionField field2(3, 5, "field2", Transform::Identity());
+  ICEBERG_UNWRAP_OR_FAIL(auto spec1, PartitionSpec::Make(1, {field1}));
+
+  auto base = CreateBaseMetadata(std::move(spec1));
+  auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto spec2, PartitionSpec::Make(2, {field1, field2}));
+  builder->AddPartitionSpec(std::move(spec2));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+  ASSERT_EQ(metadata->partition_specs.size(), 2);
+
+  // Remove one non-existing spec
+  builder = TableMetadataBuilder::BuildFrom(metadata.get());
+  builder->RemovePartitionSpecs({3});
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->partition_specs.size(), 2);
+
+  // Remove all
+  builder = TableMetadataBuilder::BuildFrom(metadata.get());
+  builder->RemovePartitionSpecs({2, 3});
+  ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+  ASSERT_EQ(metadata->partition_specs.size(), 1);
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 6ef942db..f763c567 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -30,6 +30,7 @@
 #include "iceberg/table_requirement.h"
 #include "iceberg/table_requirements.h"
 #include "iceberg/table_update.h"
+#include "iceberg/update/expire_snapshots.h"
 #include "iceberg/update/pending_update.h"
 #include "iceberg/update/snapshot_update.h"
 #include "iceberg/update/update_partition_spec.h"
@@ -163,6 +164,25 @@ Status Transaction::Apply(PendingUpdate& update) {
         metadata_builder_->AssignUUID();
       }
     } break;
+    case PendingUpdate::Kind::kExpireSnapshots: {
+      auto& expire_snapshots = 
internal::checked_cast<ExpireSnapshots&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
+      if (!result.snapshot_ids_to_remove.empty()) {
+        
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
+      }
+      if (!result.refs_to_remove.empty()) {
+        for (const auto& ref_name : result.refs_to_remove) {
+          metadata_builder_->RemoveRef(ref_name);
+        }
+      }
+      if (!result.partition_spec_ids_to_remove.empty()) {
+        metadata_builder_->RemovePartitionSpecs(
+            std::move(result.partition_spec_ids_to_remove));
+      }
+      if (!result.schema_ids_to_remove.empty()) {
+        
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
+      }
+    } break;
     default:
       return NotSupported("Unsupported pending update: {}",
                           static_cast<int32_t>(update.kind()));
@@ -253,4 +273,11 @@ Result<std::shared_ptr<UpdateSchema>> 
Transaction::NewUpdateSchema() {
   return update_schema;
 }
 
+Result<std::shared_ptr<ExpireSnapshots>> Transaction::NewExpireSnapshots() {
+  ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<ExpireSnapshots> expire_snapshots,
+                          ExpireSnapshots::Make(shared_from_this()));
+  ICEBERG_RETURN_UNEXPECTED(AddUpdate(expire_snapshots));
+  return expire_snapshots;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 3c2395c2..057a27a9 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -78,6 +78,10 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// changes.
   Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
 
+  /// \brief Create a new ExpireSnapshots to remove expired snapshots and 
commit the
+  /// changes.
+  Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
+
  private:
   Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
               std::unique_ptr<TableMetadataBuilder> metadata_builder);
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index ff49e1ed..c8854031 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -193,6 +193,7 @@ class UpdatePartitionSpec;
 class UpdateProperties;
 class UpdateSchema;
 class UpdateSortOrder;
+class ExpireSnapshots;
 
 /// 
----------------------------------------------------------------------------
 /// TODO: Forward declarations below are not added yet.
diff --git a/src/iceberg/update/expire_snapshots.cc 
b/src/iceberg/update/expire_snapshots.cc
new file mode 100644
index 00000000..68cf08ca
--- /dev/null
+++ b/src/iceberg/update/expire_snapshots.cc
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/expire_snapshots.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iterator>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/error_collector.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make(
+    std::shared_ptr<Transaction> transaction) {
+  ICEBERG_PRECHECK(transaction != nullptr,
+                   "Cannot create ExpireSnapshots without a transaction");
+  return std::shared_ptr<ExpireSnapshots>(new 
ExpireSnapshots(std::move(transaction)));
+}
+
+ExpireSnapshots::ExpireSnapshots(std::shared_ptr<Transaction> transaction)
+    : PendingUpdate(std::move(transaction)),
+      current_time_ms_(CurrentTimePointMs()),
+      
default_max_ref_age_ms_(base().properties.Get(TableProperties::kMaxRefAgeMs)),
+      default_min_num_snapshots_(
+          base().properties.Get(TableProperties::kMinSnapshotsToKeep)),
+      default_expire_older_than_(current_time_ms_ -
+                                 
std::chrono::milliseconds(base().properties.Get(
+                                     TableProperties::kMaxSnapshotAgeMs))) {
+  if (!base().properties.Get(TableProperties::kGcEnabled)) {
+    AddError(
+        ValidationFailed("Cannot expire snapshots: GC is disabled (deleting 
files may "
+                         "corrupt other tables)"));
+    return;
+  }
+}
+
+ExpireSnapshots::~ExpireSnapshots() = default;
+
+ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) {
+  snapshot_ids_to_expire_.push_back(snapshot_id);
+  specified_snapshot_id_ = true;
+  return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::ExpireOlderThan(int64_t timestamp_millis) {
+  default_expire_older_than_ = TimePointMsFromUnixMs(timestamp_millis);
+  return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::RetainLast(int num_snapshots) {
+  ICEBERG_BUILDER_CHECK(num_snapshots > 0,
+                        "Number of snapshots to retain must be positive: {}",
+                        num_snapshots);
+  default_min_num_snapshots_ = num_snapshots;
+  return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::DeleteWith(
+    std::function<void(const std::string&)> delete_func) {
+  delete_func_ = std::move(delete_func);
+  return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) {
+  cleanup_level_ = level;
+  return *this;
+}
+
+ExpireSnapshots& ExpireSnapshots::CleanExpiredMetadata(bool clean) {
+  clean_expired_metadata_ = clean;
+  return *this;
+}
+
+Result<std::unordered_set<int64_t>> 
ExpireSnapshots::ComputeBranchSnapshotsToRetain(
+    int64_t snapshot_id, TimePointMs expire_snapshot_older_than,
+    int32_t min_snapshots_to_keep) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto snapshots,
+                          SnapshotUtil::AncestorsOf(snapshot_id, 
[this](int64_t id) {
+                            return base().SnapshotById(id);
+                          }));
+
+  std::unordered_set<int64_t> ids_to_retain;
+  ids_to_retain.reserve(snapshots.size());
+
+  for (const auto& ancestor : snapshots) {
+    ICEBERG_DCHECK(ancestor != nullptr, "Ancestor snapshot is null");
+    if (ids_to_retain.size() < min_snapshots_to_keep ||
+        ancestor->timestamp_ms >= expire_snapshot_older_than) {
+      ids_to_retain.insert(ancestor->snapshot_id);
+    } else {
+      break;
+    }
+  }
+
+  return ids_to_retain;
+}
+
+Result<std::unordered_set<int64_t>> 
ExpireSnapshots::ComputeAllBranchSnapshotIdsToRetain(
+    const SnapshotToRef& refs) const {
+  std::unordered_set<int64_t> snapshot_ids_to_retain;
+  for (const auto& [key, ref] : refs) {
+    if (ref->type() != SnapshotRefType::kBranch) {
+      continue;
+    }
+    const auto& branch = std::get<SnapshotRef::Branch>(ref->retention);
+    TimePointMs expire_snapshot_older_than =
+        branch.max_snapshot_age_ms.has_value()
+            ? current_time_ms_ -
+                  std::chrono::milliseconds(branch.max_snapshot_age_ms.value())
+            : default_expire_older_than_;
+    int32_t min_snapshots_to_keep =
+        branch.min_snapshots_to_keep.value_or(default_min_num_snapshots_);
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto to_retain,
+        ComputeBranchSnapshotsToRetain(ref->snapshot_id, 
expire_snapshot_older_than,
+                                       min_snapshots_to_keep));
+    snapshot_ids_to_retain.insert(std::make_move_iterator(to_retain.begin()),
+                                  std::make_move_iterator(to_retain.end()));
+  }
+  return snapshot_ids_to_retain;
+}
+
+Result<std::unordered_set<int64_t>> 
ExpireSnapshots::UnreferencedSnapshotIdsToRetain(
+    const SnapshotToRef& refs) const {
+  std::unordered_set<int64_t> referenced_ids;
+  for (const auto& [key, ref] : refs) {
+    if (ref->type() == SnapshotRefType::kBranch) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto snapshots, SnapshotUtil::AncestorsOf(ref->snapshot_id, 
[this](int64_t id) {
+            return base().SnapshotById(id);
+          }));
+      for (const auto& snapshot : snapshots) {
+        ICEBERG_DCHECK(snapshot != nullptr, "Ancestor snapshot is null");
+        referenced_ids.insert(snapshot->snapshot_id);
+      }
+    } else {
+      referenced_ids.insert(ref->snapshot_id);
+    }
+  }
+
+  std::unordered_set<int64_t> ids_to_retain;
+  for (const auto& snapshot : base().snapshots) {
+    ICEBERG_DCHECK(snapshot != nullptr, "Snapshot is null");
+    if (!referenced_ids.contains(snapshot->snapshot_id) &&
+        snapshot->timestamp_ms > default_expire_older_than_) {
+      // unreferenced and not old enough to be expired
+      ids_to_retain.insert(snapshot->snapshot_id);
+    }
+  }
+  return ids_to_retain;
+}
+
+Result<ExpireSnapshots::SnapshotToRef> ExpireSnapshots::ComputeRetainedRefs(
+    const SnapshotToRef& refs) const {
+  const TableMetadata& base = this->base();
+  SnapshotToRef retained_refs;
+
+  for (const auto& [key, ref] : refs) {
+    if (key == SnapshotRef::kMainBranch) {
+      retained_refs[key] = ref;
+      continue;
+    }
+
+    std::shared_ptr<Snapshot> snapshot;
+    if (auto result = base.SnapshotById(ref->snapshot_id); result.has_value()) 
{
+      snapshot = std::move(result.value());
+    } else if (result.error().kind != ErrorKind::kNotFound) {
+      ICEBERG_RETURN_UNEXPECTED(result);
+    }
+
+    auto max_ref_ags_ms = 
ref->max_ref_age_ms().value_or(default_max_ref_age_ms_);
+    if (snapshot != nullptr) {
+      if (current_time_ms_ - snapshot->timestamp_ms <=
+          std::chrono::milliseconds(max_ref_ags_ms)) {
+        retained_refs[key] = ref;
+      }
+    } else {
+      // Removing invalid refs that point to non-existing snapshot
+    }
+  }
+
+  return retained_refs;
+}
+
+Result<ExpireSnapshots::ApplyResult> ExpireSnapshots::Apply() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  const TableMetadata& base = this->base();
+  // Attempt to clean expired metadata even if there are no snapshots to 
expire.
+  // Table metadata builder takes care of the case when this should actually 
be a no-op
+  if (base.snapshots.empty() && !clean_expired_metadata_) {
+    return {};
+  }
+
+  std::unordered_set<int64_t> ids_to_retain;
+  ICEBERG_ASSIGN_OR_RAISE(auto retained_refs, ComputeRetainedRefs(base.refs));
+  std::unordered_map<int64_t, std::vector<std::string>> retained_id_to_refs;
+  for (const auto& [key, ref] : retained_refs) {
+    int64_t snapshot_id = ref->snapshot_id;
+    retained_id_to_refs.try_emplace(snapshot_id, std::vector<std::string>{});
+    retained_id_to_refs[snapshot_id].push_back(key);
+    ids_to_retain.insert(snapshot_id);
+  }
+
+  for (int64_t id : snapshot_ids_to_expire_) {
+    ICEBERG_PRECHECK(!retained_id_to_refs.contains(id),
+                     "Cannot expire {}. Still referenced by refs", id);
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto all_branch_snapshot_ids,
+                          ComputeAllBranchSnapshotIdsToRetain(retained_refs));
+  ICEBERG_ASSIGN_OR_RAISE(auto unreferenced_snapshot_ids,
+                          UnreferencedSnapshotIdsToRetain(retained_refs));
+  ids_to_retain.insert(all_branch_snapshot_ids.begin(), 
all_branch_snapshot_ids.end());
+  ids_to_retain.insert(unreferenced_snapshot_ids.begin(),
+                       unreferenced_snapshot_ids.end());
+
+  ApplyResult result;
+
+  std::ranges::for_each(base.refs, [&retained_refs, &result](const auto& 
key_to_ref) {
+    if (!retained_refs.contains(key_to_ref.first)) {
+      result.refs_to_remove.push_back(key_to_ref.first);
+    }
+  });
+  std::ranges::for_each(base.snapshots, [&ids_to_retain, &result](const auto& 
snapshot) {
+    if (snapshot && !ids_to_retain.contains(snapshot->snapshot_id)) {
+      result.snapshot_ids_to_remove.push_back(snapshot->snapshot_id);
+    }
+  });
+
+  if (clean_expired_metadata_) {
+    std::unordered_set<int32_t> reachable_specs = {base.default_spec_id};
+    std::unordered_set<int32_t> reachable_schemas = {base.current_schema_id};
+
+    // TODO(xiao.dong) parallel processing
+    for (int64_t snapshot_id : ids_to_retain) {
+      ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id));
+      SnapshotCache snapshot_cache(snapshot.get());
+      ICEBERG_ASSIGN_OR_RAISE(auto manifests,
+                              
snapshot_cache.Manifests(transaction_->table()->io()));
+      for (const auto& manifest : manifests) {
+        reachable_specs.insert(manifest.partition_spec_id);
+      }
+      if (snapshot->schema_id.has_value()) {
+        reachable_schemas.insert(snapshot->schema_id.value());
+      }
+    }
+
+    std::ranges::for_each(
+        base.partition_specs, [&reachable_specs, &result](const auto& spec) {
+          if (!reachable_specs.contains(spec->spec_id())) {
+            result.partition_spec_ids_to_remove.emplace_back(spec->spec_id());
+          }
+        });
+    std::ranges::for_each(base.schemas,
+                          [&reachable_schemas, &result](const auto& schema) {
+                            if 
(!reachable_schemas.contains(schema->schema_id())) {
+                              
result.schema_ids_to_remove.insert(schema->schema_id());
+                            }
+                          });
+  }
+
+  return result;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/expire_snapshots.h 
b/src/iceberg/update/expire_snapshots.h
new file mode 100644
index 00000000..17a4d8b0
--- /dev/null
+++ b/src/iceberg/update/expire_snapshots.h
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+#include "iceberg/util/timepoint.h"
+
+/// \file iceberg/update/expire_snapshots.h
+/// \brief API for removing old snapshots from a table.
+
+namespace iceberg {
+
+/// \brief An enum representing possible clean up levels used in snapshot 
expiration.
+enum class CleanupLevel : uint8_t {
+  /// Skip all file cleanup, only remove snapshot metadata.
+  kNone,
+  /// Clean up only metadata files (manifests, manifest lists, statistics), 
retain data
+  /// files.
+  kMetadataOnly,
+  /// Clean up both metadata and data files (default).
+  kAll
+};
+
+/// \brief API for removing old snapshots from a table.
+///
+/// This API accumulates snapshot deletions and commits the new list to the 
table. This
+/// API does not allow deleting the current snapshot.
+///
+/// When committing, these changes will be applied to the latest table 
metadata. Commit
+/// conflicts will be resolved by applying the changes to the new latest 
metadata and
+/// reattempting the commit.
+///
+/// Manifest files that are no longer used by valid snapshots will be deleted. 
Data files
+/// that were deleted by snapshots that are expired will be deleted. 
DeleteWith() can be
+/// used to pass an alternative deletion method.
+///
+/// Apply() returns a list of the snapshots that will be removed.
+class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate {
+ public:
+  static Result<std::shared_ptr<ExpireSnapshots>> Make(
+      std::shared_ptr<Transaction> transaction);
+
+  ~ExpireSnapshots() override;
+
+  struct ApplyResult {
+    std::vector<std::string> refs_to_remove;
+    std::vector<int64_t> snapshot_ids_to_remove;
+    std::vector<int32_t> partition_spec_ids_to_remove;
+    std::unordered_set<int32_t> schema_ids_to_remove;
+  };
+
+  /// \brief Expires a specific Snapshot identified by id.
+  ///
+  /// \param snapshot_id Long id of the snapshot to expire.
+  /// \return Reference to this for method chaining.
+  ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id);
+
+  /// \brief Expires all snapshots older than the given timestamp.
+  ///
+  /// \param timestamp_millis A long timestamp in milliseconds.
+  /// \return Reference to this for method chaining.
+  ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis);
+
+  /// \brief Retains the most recent ancestors of the current snapshot.
+  ///
+  /// If a snapshot would be expired because it is older than the expiration 
timestamp,
+  /// but is one of the num_snapshots most recent ancestors of the current 
state, it will
+  /// be retained. This will not cause snapshots explicitly identified by id 
from
+  /// expiring.
+  ///
+  /// This may keep more than num_snapshots ancestors if snapshots are added 
concurrently.
+  /// This may keep less than num_snapshots ancestors if the current table 
state does not
+  /// have that many.
+  ///
+  /// \param num_snapshots The number of snapshots to retain.
+  /// \return Reference to this for method chaining.
+  ExpireSnapshots& RetainLast(int num_snapshots);
+
+  /// \brief Passes an alternative delete implementation that will be used for 
manifests
+  /// and data files.
+  ///
+  /// Manifest files that are no longer used by valid snapshots will be 
deleted. Data
+  /// files that were deleted by snapshots that are expired will be deleted.
+  ///
+  /// If this method is not called, unnecessary manifests and data files will 
still be
+  /// deleted.
+  ///
+  /// \param delete_func A function that will be called to delete manifests 
and data files
+  /// \return Reference to this for method chaining.
+  ExpireSnapshots& DeleteWith(std::function<void(const std::string&)> 
delete_func);
+
+  /// \brief Configures the cleanup level for expired files.
+  ///
+  /// This method provides fine-grained control over which files are cleaned 
up during
+  /// snapshot expiration.
+  ///
+  /// Consider CleanupLevel::kMetadataOnly when data files are shared across 
tables or
+  /// when using procedures like add-files that may reference the same data 
files.
+  ///
+  /// Consider CleanupLevel::kNone when data and metadata files may be more 
efficiently
+  /// removed using a distributed framework through the actions API.
+  ///
+  /// \param level The cleanup level to use for expired snapshots.
+  /// \return Reference to this for method chaining.
+  ExpireSnapshots& CleanupLevel(enum CleanupLevel level);
+
+  /// \brief Enable cleaning up unused metadata, such as partition specs, 
schemas, etc.
+  ///
+  /// \param clean Remove unused partition specs, schemas, or other metadata 
when true.
+  /// \return Reference to this for method chaining.
+  ExpireSnapshots& CleanExpiredMetadata(bool clean);
+
+  Kind kind() const final { return Kind::kExpireSnapshots; }
+
+  /// \brief Apply the pending changes and return the results
+  /// \return The results of changes
+  Result<ApplyResult> Apply();
+
+ private:
+  explicit ExpireSnapshots(std::shared_ptr<Transaction> transaction);
+
+  using SnapshotToRef = std::unordered_map<std::string, 
std::shared_ptr<SnapshotRef>>;
+
+  Result<SnapshotToRef> ComputeRetainedRefs(const SnapshotToRef& refs) const;
+
+  Result<std::unordered_set<int64_t>> ComputeBranchSnapshotsToRetain(
+      int64_t snapshot_id, TimePointMs expire_snapshot_older_than,
+      int32_t min_snapshots_to_keep) const;
+
+  Result<std::unordered_set<int64_t>> ComputeAllBranchSnapshotIdsToRetain(
+      const SnapshotToRef& refs) const;
+
+  Result<std::unordered_set<int64_t>> UnreferencedSnapshotIdsToRetain(
+      const SnapshotToRef& refs) const;
+
+ private:
+  const TimePointMs current_time_ms_;
+  const int64_t default_max_ref_age_ms_;
+  int32_t default_min_num_snapshots_;
+  TimePointMs default_expire_older_than_;
+  std::function<void(const std::string&)> delete_func_;
+  std::vector<int64_t> snapshot_ids_to_expire_;
+  enum CleanupLevel cleanup_level_ { CleanupLevel::kAll };
+  bool clean_expired_metadata_{false};
+  bool specified_snapshot_id_{false};
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/pending_update.h 
b/src/iceberg/update/pending_update.h
index 2124d7e1..8a8329ee 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -42,6 +42,7 @@ namespace iceberg {
 class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
  public:
   enum class Kind : uint8_t {
+    kExpireSnapshots,
     kUpdatePartitionSpec,
     kUpdateProperties,
     kUpdateSchema,
diff --git a/src/iceberg/util/snapshot_util.cc 
b/src/iceberg/util/snapshot_util.cc
index 4395dc27..c3b93be8 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -46,6 +46,14 @@ Result<std::vector<std::shared_ptr<Snapshot>>> 
SnapshotUtil::AncestorsOf(
   });
 }
 
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+    int64_t snapshot_id,
+    const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup) {
+  return lookup(snapshot_id).and_then([&lookup](const auto& snapshot) {
+    return AncestorsOf(snapshot, lookup);
+  });
+}
+
 Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t 
snapshot_id,
                                         int64_t ancestor_snapshot_id) {
   ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
diff --git a/src/iceberg/util/snapshot_util_internal.h 
b/src/iceberg/util/snapshot_util_internal.h
index befa96fe..ca106cb3 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -44,6 +44,15 @@ class ICEBERG_EXPORT SnapshotUtil {
   static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(const 
Table& table,
                                                                     int64_t 
snapshot_id);
 
+  /// \brief Returns a vector of ancestors of the given snapshot.
+  ///
+  /// \param snapshot_id The snapshot ID to start from
+  /// \param lookup A function to look up snapshots by ID
+  /// \return A vector of ancestor snapshots
+  static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(
+      int64_t snapshot_id,
+      const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup);
+
   /// \brief Returns whether ancestor_snapshot_id is an ancestor of 
snapshot_id.
   ///
   /// \param table The table to check


Reply via email to