This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e7b20ab feat: add IncrementalFileCleanup strategy and dispatch in 
ExpireSnapshots::Finalize (#648)
3e7b20ab is described below

commit 3e7b20ab8555f51105e56176af23f22641fcea18
Author: Xinli Shang <[email protected]>
AuthorDate: Wed May 20 08:22:49 2026 -0700

    feat: add IncrementalFileCleanup strategy and dispatch in 
ExpireSnapshots::Finalize (#648)
    
    Mirrors Java's IncrementalFileCleanup for the linear-ancestry case: each
    manifest is attributed to its writer snapshot, so two passes are enough
    instead of the full reachability scan. Cherry-pick protection via
    SnapshotSummaryFields::kSourceSnapshotId is preserved.
    
    Finalize() now picks IncrementalFileCleanup when the expiration is
    "simple" (no explicit snapshot IDs, no removed snapshots outside the
    current main ancestry, and no retained snapshots outside the current
    main ancestry), and falls back to ReachableFileCleanup otherwise. The
    dispatch matches Java RemoveSnapshots.cleanExpiredSnapshots.
    
    Two existing cleanup tests (DeletesExpiredFiles,
    IgnoresExpiredDeleteManifestReadFailures) used an empty current manifest
    list, which is an unreachable-orphan scenario that only
    ReachableFileCleanup can resolve. They now call ExpireSnapshotId() to
    force the reachable path, which keeps their original intent and matches
    Java behavior. New tests cover both dispatch branches.
    
    ---------
    
    Co-authored-by: shangxinli <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 src/iceberg/manifest/manifest_group.cc    |   8 +-
 src/iceberg/manifest/manifest_reader.cc   |  14 ++
 src/iceberg/manifest/manifest_reader.h    |  11 +
 src/iceberg/test/expire_snapshots_test.cc | 258 ++++++++++++++++++++
 src/iceberg/update/expire_snapshots.cc    | 376 ++++++++++++++++++++++++++++--
 src/iceberg/update/expire_snapshots.h     |   1 +
 6 files changed, 636 insertions(+), 32 deletions(-)

diff --git a/src/iceberg/manifest/manifest_group.cc 
b/src/iceberg/manifest/manifest_group.cc
index 220b8585..8af717b2 100644
--- a/src/iceberg/manifest/manifest_group.cc
+++ b/src/iceberg/manifest/manifest_group.cc
@@ -262,14 +262,8 @@ Result<std::vector<ManifestEntry>> 
ManifestGroup::Entries() {
 
 Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
     const ManifestFile& manifest) {
-  auto spec_it = specs_by_id_.find(manifest.partition_spec_id);
-  if (spec_it == specs_by_id_.end()) {
-    return InvalidArgument("Partition spec {} not found for manifest {}",
-                           manifest.partition_spec_id, manifest.manifest_path);
-  }
-
   ICEBERG_ASSIGN_OR_RAISE(auto reader,
-                          ManifestReader::Make(manifest, io_, schema_, 
spec_it->second));
+                          ManifestReader::Make(manifest, io_, schema_, 
specs_by_id_));
 
   reader->FilterRows(data_filter_)
       .FilterPartitions(partition_filter_)
diff --git a/src/iceberg/manifest/manifest_reader.cc 
b/src/iceberg/manifest/manifest_reader.cc
index 53100b23..7747e2be 100644
--- a/src/iceberg/manifest/manifest_reader.cc
+++ b/src/iceberg/manifest/manifest_reader.cc
@@ -21,6 +21,7 @@
 
 #include <algorithm>
 #include <memory>
+#include <optional>
 #include <ranges>
 #include <type_traits>
 #include <unordered_set>
@@ -998,6 +999,19 @@ Result<std::unique_ptr<ManifestReader>> 
ManifestReader::Make(
       manifest.first_row_id);
 }
 
+Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
+    const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
+    std::shared_ptr<Schema> schema,
+    const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& 
specs_by_id) {
+  auto spec_it = specs_by_id.find(manifest.partition_spec_id);
+  if (spec_it == specs_by_id.end() || spec_it->second == nullptr) {
+    return InvalidArgument("Partition spec {} not found for manifest {}",
+                           manifest.partition_spec_id, manifest.manifest_path);
+  }
+  auto spec = spec_it->second;
+  return Make(manifest, std::move(file_io), std::move(schema), 
std::move(spec));
+}
+
 Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
     std::string_view manifest_location, std::optional<int64_t> manifest_length,
     std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
diff --git a/src/iceberg/manifest/manifest_reader.h 
b/src/iceberg/manifest/manifest_reader.h
index 1a142021..42c56e1c 100644
--- a/src/iceberg/manifest/manifest_reader.h
+++ b/src/iceberg/manifest/manifest_reader.h
@@ -92,6 +92,17 @@ class ICEBERG_EXPORT ManifestReader {
       const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
       std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
 
+  /// \brief Creates a reader for a manifest file using specs keyed by ID.
+  /// \param manifest A ManifestFile object containing metadata about the 
manifest.
+  /// \param file_io File IO implementation to use.
+  /// \param schema Schema used to bind the partition type.
+  /// \param specs_by_id Mapping of partition spec ID to PartitionSpec.
+  /// \return A Result containing the reader or an error.
+  static Result<std::unique_ptr<ManifestReader>> Make(
+      const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
+      std::shared_ptr<Schema> schema,
+      const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& 
specs_by_id);
+
   /// \brief Creates a reader for a manifest file.
   /// \param manifest_location Path to the manifest file.
   /// \param manifest_length Length of the manifest file.
diff --git a/src/iceberg/test/expire_snapshots_test.cc 
b/src/iceberg/test/expire_snapshots_test.cc
index 4dcc72d6..3a99b000 100644
--- a/src/iceberg/test/expire_snapshots_test.cc
+++ b/src/iceberg/test/expire_snapshots_test.cc
@@ -28,6 +28,9 @@
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
 #include "iceberg/statistics_file.h"
 #include "iceberg/table_metadata.h"
 #include "iceberg/test/matchers.h"
@@ -135,6 +138,13 @@ class ExpireSnapshotsCleanupTest : public UpdateTestBase {
     return manifest_result.value();
   }
 
+  ManifestFile AssignManifestSequenceNumber(ManifestFile manifest,
+                                            int64_t sequence_number) const {
+    manifest.sequence_number = sequence_number;
+    manifest.min_sequence_number = sequence_number;
+    return manifest;
+  }
+
   ManifestFile WriteDeleteManifest(const std::string& path, int64_t 
snapshot_id,
                                    std::vector<ManifestEntry> entries) {
     auto writer_result = ManifestWriter::MakeWriter(
@@ -227,6 +237,15 @@ TEST_F(ExpireSnapshotsTest, ExpireById) {
   EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
 }
 
+TEST_F(ExpireSnapshotsTest, ExpireByIdOverridesRetainLast) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->RetainLast(2);
+  update->ExpireSnapshotId(3051729675574597004);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_THAT(result.snapshot_ids_to_remove, 
testing::ElementsAre(3051729675574597004));
+}
+
 TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
   struct TestCase {
     int64_t expire_older_than;
@@ -243,6 +262,30 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
   }
 }
 
+TEST_F(ExpireSnapshotsCleanupTest, 
RetainsUnreferencedSnapshotAtExpireThreshold) {
+  const int64_t unreferenced_snapshot_id = 4055729675574597004;
+  const int64_t expire_at_ms = 1515100955770;
+
+  auto metadata = ReloadMetadata();
+  metadata->snapshots.push_back(std::make_shared<Snapshot>(Snapshot{
+      .snapshot_id = unreferenced_snapshot_id,
+      .parent_snapshot_id = std::nullopt,
+      .sequence_number = 2,
+      .timestamp_ms = TimePointMsFromUnixMs(expire_at_ms),
+      .manifest_list = table_location_ + "/metadata/unreferenced.avro",
+      .summary = {{SnapshotSummaryFields::kOperation, "append"}},
+      .schema_id = metadata->current_schema_id,
+  }));
+  RewriteTable(std::move(metadata));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->ExpireOlderThan(expire_at_ms);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_THAT(result.snapshot_ids_to_remove,
+              testing::Not(testing::Contains(unreferenced_snapshot_id)));
+}
+
 TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) {
   std::vector<std::string> deleted_files;
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
@@ -350,6 +393,8 @@ TEST_F(ExpireSnapshotsCleanupTest, 
IgnoresExpiredDeleteManifestReadFailures) {
 
   std::vector<std::string> deleted_files;
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  // Force the reachable path.
+  update->ExpireSnapshotId(kExpiredSnapshotId);
   update->DeleteWith(
       [&deleted_files](const std::string& path) { 
deleted_files.push_back(path); });
 
@@ -388,6 +433,7 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) {
 
   std::vector<std::string> deleted_files;
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->ExpireSnapshotId(kExpiredSnapshotId);
   update->DeleteWith(
       [&deleted_files](const std::string& path) { 
deleted_files.push_back(path); });
 
@@ -573,4 +619,216 @@ TEST_F(ExpireSnapshotsCleanupTest, 
KeepsReusedPartitionStats) {
   EXPECT_THAT(deleted_files, 
testing::Not(testing::Contains(reused_statistics_path)));
 }
 
+TEST_F(ExpireSnapshotsCleanupTest, 
IncrementalDispatchPreservesAncestorAddedFiles) {
+  const auto expired_data_file_path = table_location_ + 
"/data/expired-data.parquet";
+  const auto expired_data_manifest_path = table_location_ + 
"/metadata/expired-data.avro";
+  const auto expired_manifest_list_path =
+      table_location_ + "/metadata/expired-manifest-list.avro";
+  const auto current_manifest_list_path =
+      table_location_ + "/metadata/current-manifest-list.avro";
+
+  auto expired_data_manifest = WriteDataManifest(
+      expired_data_manifest_path, kExpiredSnapshotId,
+      {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, 
kExpiredSequenceNumber,
+                 MakeDataFile(expired_data_file_path))});
+  WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
+                    /*parent_snapshot_id=*/0, kExpiredSequenceNumber,
+                    {expired_data_manifest});
+  WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, 
kExpiredSnapshotId,
+                    kCurrentSequenceNumber, {});
+  RewriteTableWithManifestLists(expired_manifest_list_path, 
current_manifest_list_path);
+
+  std::vector<std::string> deleted_files;
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->DeleteWith(
+      [&deleted_files](const std::string& path) { 
deleted_files.push_back(path); });
+
+  EXPECT_THAT(update->Commit(), IsOk());
+  EXPECT_THAT(deleted_files, testing::Contains(expired_data_manifest_path));
+  EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path));
+  EXPECT_THAT(deleted_files, 
testing::Not(testing::Contains(expired_data_file_path)));
+}
+
+TEST_F(ExpireSnapshotsCleanupTest, IncrementalDeletesExpiredDeletedEntries) {
+  const auto deleted_data_file_path =
+      table_location_ + "/data/deleted-by-expired.parquet";
+  const auto delete_manifest_path =
+      table_location_ + "/metadata/expired-delete-entry.avro";
+  const auto expired_manifest_list_path =
+      table_location_ + "/metadata/expired-deleted-entry-ml.avro";
+  const auto current_manifest_list_path =
+      table_location_ + "/metadata/current-deleted-entry-ml.avro";
+
+  auto delete_manifest = WriteDataManifest(
+      delete_manifest_path, kExpiredSnapshotId,
+      {MakeEntry(ManifestStatus::kDeleted, kExpiredSnapshotId, 
kExpiredSequenceNumber,
+                 MakeDataFile(deleted_data_file_path))});
+  delete_manifest =
+      AssignManifestSequenceNumber(std::move(delete_manifest), 
kExpiredSequenceNumber);
+  WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
+                    /*parent_snapshot_id=*/0, kExpiredSequenceNumber, 
{delete_manifest});
+  WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, 
kExpiredSnapshotId,
+                    kCurrentSequenceNumber, {delete_manifest});
+  RewriteTableWithManifestLists(expired_manifest_list_path, 
current_manifest_list_path);
+
+  std::vector<std::string> deleted_files;
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->DeleteWith(
+      [&deleted_files](const std::string& path) { 
deleted_files.push_back(path); });
+
+  EXPECT_THAT(update->Commit(), IsOk());
+  EXPECT_THAT(deleted_files, testing::Contains(deleted_data_file_path));
+  EXPECT_THAT(deleted_files, testing::Contains(expired_manifest_list_path));
+  EXPECT_THAT(deleted_files, 
testing::Not(testing::Contains(delete_manifest_path)));
+}
+
+TEST_F(ExpireSnapshotsCleanupTest, ReachableDispatchDeletesUnreachableData) {
+  const auto expired_data_file_path = table_location_ + 
"/data/expired-data.parquet";
+  const auto expired_data_manifest_path = table_location_ + 
"/metadata/expired-data.avro";
+  const auto expired_manifest_list_path =
+      table_location_ + "/metadata/expired-manifest-list.avro";
+  const auto current_manifest_list_path =
+      table_location_ + "/metadata/current-manifest-list.avro";
+
+  auto expired_data_manifest = WriteDataManifest(
+      expired_data_manifest_path, kExpiredSnapshotId,
+      {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, 
kExpiredSequenceNumber,
+                 MakeDataFile(expired_data_file_path))});
+  WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
+                    /*parent_snapshot_id=*/0, kExpiredSequenceNumber,
+                    {expired_data_manifest});
+  WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, 
kExpiredSnapshotId,
+                    kCurrentSequenceNumber, {});
+  RewriteTableWithManifestLists(expired_manifest_list_path, 
current_manifest_list_path);
+
+  std::vector<std::string> deleted_files;
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->ExpireSnapshotId(kExpiredSnapshotId);
+  update->DeleteWith(
+      [&deleted_files](const std::string& path) { 
deleted_files.push_back(path); });
+
+  EXPECT_THAT(update->Commit(), IsOk());
+  EXPECT_THAT(deleted_files, 
testing::UnorderedElementsAre(expired_data_file_path,
+                                                           
expired_data_manifest_path,
+                                                           
expired_manifest_list_path));
+}
+
+TEST_F(ExpireSnapshotsCleanupTest, 
IncrementalSkipsCherryPickedSnapshotCleanup) {
+  const auto picked_data_file_path = table_location_ + 
"/data/picked-data.parquet";
+  const auto picked_manifest_path = table_location_ + 
"/metadata/picked-data.avro";
+  const auto expired_manifest_list_path =
+      table_location_ + "/metadata/expired-picked-ml.avro";
+  const auto current_manifest_list_path =
+      table_location_ + "/metadata/current-picked-ml.avro";
+
+  auto picked_manifest = WriteDataManifest(
+      picked_manifest_path, kExpiredSnapshotId,
+      {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, 
kExpiredSequenceNumber,
+                 MakeDataFile(picked_data_file_path))});
+  picked_manifest =
+      AssignManifestSequenceNumber(std::move(picked_manifest), 
kExpiredSequenceNumber);
+  WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
+                    /*parent_snapshot_id=*/0, kExpiredSequenceNumber, 
{picked_manifest});
+  WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, 
kExpiredSnapshotId,
+                    kCurrentSequenceNumber, {picked_manifest});
+
+  auto metadata = ReloadMetadata();
+  ASSERT_EQ(metadata->snapshots.size(), 2);
+  metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
+  metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
+  metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] 
=
+      std::to_string(kExpiredSnapshotId);
+  RewriteTable(std::move(metadata));
+
+  std::vector<std::string> deleted_files;
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->DeleteWith(
+      [&deleted_files](const std::string& path) { 
deleted_files.push_back(path); });
+
+  EXPECT_THAT(update->Commit(), IsOk());
+  EXPECT_TRUE(deleted_files.empty());
+  auto committed_metadata = ReloadMetadata();
+  EXPECT_EQ(committed_metadata->snapshots.size(), 1);
+  EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, 
kCurrentSnapshotId);
+}
+
+TEST_F(ExpireSnapshotsCleanupTest, 
ReachableCleanupFailsClosedOnUnbindableExpiredSpec) {
+  const auto expired_data_file_path = table_location_ + 
"/data/expired-data.parquet";
+  const auto expired_data_manifest_path = table_location_ + 
"/metadata/expired-data.avro";
+  const auto expired_manifest_list_path =
+      table_location_ + "/metadata/expired-manifest-list.avro";
+  const auto current_manifest_list_path =
+      table_location_ + "/metadata/current-manifest-list.avro";
+
+  auto expired_data_manifest = WriteDataManifest(
+      expired_data_manifest_path, kExpiredSnapshotId,
+      {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, 
kExpiredSequenceNumber,
+                 MakeDataFile(expired_data_file_path))});
+  WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
+                    /*parent_snapshot_id=*/0, kExpiredSequenceNumber,
+                    {expired_data_manifest});
+  WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, 
kExpiredSnapshotId,
+                    kCurrentSequenceNumber, {});
+
+  auto metadata = ReloadMetadata();
+  ASSERT_EQ(metadata->snapshots.size(), 2);
+  metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
+  metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
+  ICEBERG_UNWRAP_OR_FAIL(auto retained_spec, 
PartitionSpec::Make(/*spec_id=*/1, {}));
+  metadata->partition_specs.push_back(
+      std::shared_ptr<PartitionSpec>(std::move(retained_spec)));
+  metadata->default_spec_id = 1;
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto retained_schema,
+      Schema::Make(std::vector<SchemaField>{SchemaField::MakeRequired(2, "y", 
int64()),
+                                            SchemaField::MakeRequired(3, "z", 
int64())},
+                   /*schema_id=*/2, std::vector<int32_t>{}));
+  
metadata->schemas.push_back(std::shared_ptr<Schema>(std::move(retained_schema)));
+  metadata->current_schema_id = 2;
+  metadata->snapshots.at(1)->schema_id = 2;
+  RewriteTable(std::move(metadata));
+
+  std::vector<std::string> deleted_files;
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->ExpireSnapshotId(kExpiredSnapshotId);
+  update->CleanExpiredMetadata(true);
+  update->DeleteWith(
+      [&deleted_files](const std::string& path) { 
deleted_files.push_back(path); });
+
+  EXPECT_THAT(update->Commit(), IsOk());
+  EXPECT_THAT(deleted_files, 
testing::UnorderedElementsAre(expired_data_manifest_path,
+                                                           
expired_manifest_list_path));
+  EXPECT_THAT(deleted_files, 
testing::Not(testing::Contains(expired_data_file_path)));
+}
+
+TEST_F(ExpireSnapshotsCleanupTest, 
CommitIgnoresMalformedSourceSnapshotIdCleanup) {
+  const auto expired_manifest_list_path =
+      table_location_ + "/metadata/expired-malformed-ml.avro";
+  const auto current_manifest_list_path =
+      table_location_ + "/metadata/current-malformed-ml.avro";
+  WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId,
+                    /*parent_snapshot_id=*/0, kExpiredSequenceNumber, {});
+  WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, 
kExpiredSnapshotId,
+                    kCurrentSequenceNumber, {});
+
+  auto metadata = ReloadMetadata();
+  ASSERT_EQ(metadata->snapshots.size(), 2);
+  metadata->snapshots.at(0)->manifest_list = expired_manifest_list_path;
+  metadata->snapshots.at(1)->manifest_list = current_manifest_list_path;
+  metadata->snapshots.at(1)->summary[SnapshotSummaryFields::kSourceSnapshotId] 
=
+      "not-a-number";
+  RewriteTable(std::move(metadata));
+
+  std::vector<std::string> deleted_files;
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
+  update->DeleteWith(
+      [&deleted_files](const std::string& path) { 
deleted_files.push_back(path); });
+
+  EXPECT_THAT(update->Commit(), IsOk());
+  EXPECT_TRUE(deleted_files.empty());
+  auto committed_metadata = ReloadMetadata();
+  EXPECT_EQ(committed_metadata->snapshots.size(), 1);
+  EXPECT_EQ(committed_metadata->snapshots.at(0)->snapshot_id, 
kCurrentSnapshotId);
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/update/expire_snapshots.cc 
b/src/iceberg/update/expire_snapshots.cc
index ce65882c..8e109d08 100644
--- a/src/iceberg/update/expire_snapshots.cc
+++ b/src/iceberg/update/expire_snapshots.cc
@@ -23,14 +23,16 @@
 #include <cstdint>
 #include <iterator>
 #include <memory>
-#include <optional>
 #include <string>
+#include <tuple>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include "iceberg/file_io.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
 #include "iceberg/snapshot.h"
 #include "iceberg/statistics_file.h"
@@ -40,18 +42,23 @@
 #include "iceberg/util/error_collector.h"
 #include "iceberg/util/macros.h"
 #include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/string_util.h"
 
 namespace iceberg {
 
 namespace {
 
-Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
+Result<std::unique_ptr<ManifestReader>> MakeManifestReader(
     const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
     const TableMetadata& metadata) {
+  // TODO(gangwu): Build manifest file schemas from 
PartitionSpec::RawPartitionType
+  // with UnknownType for dropped source fields instead of requiring the table 
schema
+  // to bind every partition source field. Until then, cleanup fails closed 
when
+  // historical specs cannot bind to the metadata schema.
   ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
-  ICEBERG_ASSIGN_OR_RAISE(auto spec,
-                          
metadata.PartitionSpecById(manifest.partition_spec_id));
-  return ManifestReader::Make(manifest, file_io, std::move(schema), 
std::move(spec));
+  TableMetadataCache metadata_cache(&metadata);
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, 
metadata_cache.GetPartitionSpecsById());
+  return ManifestReader::Make(manifest, file_io, std::move(schema), 
specs_by_id.get());
 }
 
 /// \brief Abstract strategy for cleaning up files after snapshot expiration.
@@ -67,14 +74,30 @@ class FileCleanupStrategy {
   ///
   /// \param metadata_before_expiration Table metadata before expiration.
   /// \param metadata_after_expiration Table metadata after expiration.
-  /// \param expired_snapshot_ids Snapshot IDs that were expired during this 
operation.
   /// \param level Controls which types of files are eligible for deletion.
   virtual Status CleanFiles(const TableMetadata& metadata_before_expiration,
                             const TableMetadata& metadata_after_expiration,
-                            const std::unordered_set<int64_t>& 
expired_snapshot_ids,
                             CleanupLevel level) = 0;
 
  protected:
+  /// \brief Snapshot IDs present in `before` but not in `after`.
+  static std::unordered_set<int64_t> ExpiredSnapshotIds(const TableMetadata& 
before,
+                                                        const TableMetadata& 
after) {
+    std::unordered_set<int64_t> after_ids;
+    after_ids.reserve(after.snapshots.size());
+    for (const auto& s : after.snapshots) {
+      if (s) after_ids.insert(s->snapshot_id);
+    }
+    std::unordered_set<int64_t> expired;
+    expired.reserve(before.snapshots.size());
+    for (const auto& s : before.snapshots) {
+      if (s && !after_ids.contains(s->snapshot_id)) {
+        expired.insert(s->snapshot_id);
+      }
+    }
+    return expired;
+  }
+
   /// \brief Delete a single file
   void DeleteFile(const std::string& path) {
     try {
@@ -84,11 +107,11 @@ class FileCleanupStrategy {
         std::ignore = file_io_->DeleteFile(path);
       }
     } catch (...) {
-      /// TODO(shangxinli): add retry
+      // TODO(shangxinli): add retry
     }
   }
 
-  /// TODO(shangxinli): Add bulk deletion
+  // TODO(shangxinli): Add bulk deletion
   void DeleteFiles(const std::unordered_set<std::string>& paths) {
     for (const auto& path : paths) {
       DeleteFile(path);
@@ -149,8 +172,10 @@ class ReachableFileCleanup : public FileCleanupStrategy {
 
   Status CleanFiles(const TableMetadata& metadata_before_expiration,
                     const TableMetadata& metadata_after_expiration,
-                    const std::unordered_set<int64_t>& expired_snapshot_ids,
                     CleanupLevel level) override {
+    const auto expired_snapshot_ids =
+        ExpiredSnapshotIds(metadata_before_expiration, 
metadata_after_expiration);
+
     std::unordered_set<int64_t> retained_snapshot_ids;
     for (const auto& snapshot : metadata_after_expiration.snapshots) {
       if (snapshot) {
@@ -182,7 +207,7 @@ class ReachableFileCleanup : public FileCleanupStrategy {
         if (level == CleanupLevel::kAll) {
           // Deleting data files
           auto data_files_to_delete = FindDataFilesToDelete(
-              metadata_after_expiration, manifests_to_delete, 
current_manifests);
+              metadata_before_expiration, manifests_to_delete, 
current_manifests);
           DeleteFiles(data_files_to_delete);
         }
 
@@ -195,8 +220,7 @@ class ReachableFileCleanup : public FileCleanupStrategy {
     DeleteFiles(manifest_lists_to_delete);
 
     // Deleting statistics files
-    if (HasAnyStatisticsFiles(metadata_before_expiration) ||
-        HasAnyStatisticsFiles(metadata_after_expiration)) {
+    if (HasAnyStatisticsFiles(metadata_before_expiration)) {
       DeleteFiles(
           StatisticsFilesToDelete(metadata_before_expiration, 
metadata_after_expiration));
     }
@@ -262,7 +286,7 @@ class ReachableFileCleanup : public FileCleanupStrategy {
                      "Cannot read data file paths from a delete manifest: {}",
                      manifest.manifest_path);
 
-    /// TODO(shangxinli): optimize by only reading file paths
+    // TODO(shangxinli): optimize by only reading file paths
     ICEBERG_ASSIGN_OR_RAISE(auto reader,
                             MakeManifestReader(manifest, file_io_, metadata));
     ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
@@ -331,6 +355,296 @@ class ReachableFileCleanup : public FileCleanupStrategy {
   }
 };
 
+/// \brief Incremental file cleanup strategy for simple linear-ancestry 
expirations.
+///
+/// Only safe when:
+///   * No snapshot IDs were explicitly listed for expiration.
+///   * No removed snapshots lived outside the current main ancestry.
+///   * No retained snapshots live outside the current main ancestry.
+///
+/// Each manifest is attributed to its writer snapshot via added_snapshot_id, 
so
+/// two snapshot passes are enough -- one over retained snapshots to learn 
which
+/// manifests are still live, one over expired snapshots to learn which 
manifests,
+/// manifest lists, and data files to drop. Cherry-pick protection via
+/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was
+/// logically introduced by a snapshot whose changes are still present in the
+/// current state under a different id.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion 
support.
+class IncrementalFileCleanup : public FileCleanupStrategy {
+ public:
+  using FileCleanupStrategy::FileCleanupStrategy;
+
+  Status CleanFiles(const TableMetadata& metadata_before_expiration,
+                    const TableMetadata& metadata_after_expiration,
+                    CleanupLevel level) override {
+    const auto expired_snapshot_ids =
+        ExpiredSnapshotIds(metadata_before_expiration, 
metadata_after_expiration);
+    if (expired_snapshot_ids.empty()) {
+      return {};
+    }
+
+    std::unordered_set<int64_t> valid_ids;
+    valid_ids.reserve(metadata_after_expiration.snapshots.size());
+    for (const auto& snapshot : metadata_after_expiration.snapshots) {
+      if (snapshot) {
+        valid_ids.insert(snapshot->snapshot_id);
+      }
+    }
+
+    auto current_result = metadata_before_expiration.SnapshotById(
+        metadata_before_expiration.current_snapshot_id);
+    if (!current_result.has_value() || current_result.value() == nullptr) {
+      return {};
+    }
+
+    // Only delete files removed by ancestors of the current table state.
+    auto ancestors_result = SnapshotUtil::AncestorsOf(
+        current_result.value()->snapshot_id, 
[&metadata_before_expiration](int64_t id) {
+          return metadata_before_expiration.SnapshotById(id);
+        });
+    if (!ancestors_result.has_value()) {
+      return {};
+    }
+    std::unordered_set<int64_t> ancestor_ids;
+    ancestor_ids.reserve(ancestors_result.value().size());
+    for (const auto& ancestor : ancestors_result.value()) {
+      if (ancestor) ancestor_ids.insert(ancestor->snapshot_id);
+    }
+
+    // Protect snapshots whose changes were picked into the current ancestry.
+    std::unordered_set<int64_t> picked_ancestor_snapshot_ids;
+    picked_ancestor_snapshot_ids.reserve(ancestor_ids.size());
+    for (const auto& ancestor : ancestors_result.value()) {
+      if (!ancestor) continue;
+      const auto& summary = ancestor->summary;
+      auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId);
+      if (it == summary.end()) continue;
+      ICEBERG_ASSIGN_OR_RAISE(auto source_id,
+                              StringUtils::ParseNumber<int64_t>(it->second));
+      picked_ancestor_snapshot_ids.insert(source_id);
+    }
+
+    // Find manifests still referenced by a valid snapshot but written by an
+    // expired snapshot. Their deleted entries point at data files now safe to
+    // remove and become candidates for manifests_to_scan below.
+    std::unordered_set<std::string> valid_manifests;
+    std::unordered_set<ManifestFile> manifests_to_scan;
+    manifests_to_scan.reserve(expired_snapshot_ids.size());
+    for (const auto& snapshot : metadata_after_expiration.snapshots) {
+      if (!snapshot) continue;
+      SnapshotCache snapshot_cache(snapshot.get());
+      auto manifests_result = snapshot_cache.Manifests(file_io_);
+      if (!manifests_result.has_value()) continue;  // best-effort
+      auto manifests = std::move(manifests_result).value();
+      for (auto& manifest : manifests) {
+        valid_manifests.insert(manifest.manifest_path);
+
+        int64_t writer_id = manifest.added_snapshot_id;
+        bool from_valid_snapshots = valid_ids.contains(writer_id);
+        bool is_from_ancestor = ancestor_ids.contains(writer_id);
+        bool is_picked = picked_ancestor_snapshot_ids.contains(writer_id);
+        if (!from_valid_snapshots && (is_from_ancestor || is_picked) &&
+            manifest.has_deleted_files()) {
+          manifests_to_scan.insert(std::move(manifest));
+        }
+      }
+    }
+
+    // Find manifests that were only referenced by snapshots that have expired,
+    // and split them by what kind of cleanup they need:
+    //   - manifests_to_delete: not referenced by any retained snapshot;
+    //   - manifests_to_scan: from a current-state ancestor and has deleted
+    //     entries (data files now safe to drop);
+    //   - manifests_to_revert: written by an expiring non-ancestor snapshot
+    //     and contains added entries -- those data files were never adopted.
+    std::unordered_set<std::string> manifest_lists_to_delete;
+    manifest_lists_to_delete.reserve(expired_snapshot_ids.size());
+    std::unordered_set<std::string> manifests_to_delete;
+    manifests_to_delete.reserve(expired_snapshot_ids.size());
+    std::unordered_set<ManifestFile> manifests_to_revert;
+    manifests_to_revert.reserve(expired_snapshot_ids.size());
+    for (const auto& snapshot : metadata_before_expiration.snapshots) {
+      if (!snapshot) continue;
+      int64_t snapshot_id = snapshot->snapshot_id;
+      if (valid_ids.contains(snapshot_id)) continue;
+
+      // Skip cherry-picked snapshots; the picked snapshot owns its cleanup.
+      if (picked_ancestor_snapshot_ids.contains(snapshot_id)) {
+        continue;
+      }
+
+      int64_t source_snapshot_id = -1;
+      auto src_it = 
snapshot->summary.find(SnapshotSummaryFields::kSourceSnapshotId);
+      if (src_it != snapshot->summary.end()) {
+        auto source_snapshot_id_result =
+            StringUtils::ParseNumber<int64_t>(src_it->second);
+        if (!source_snapshot_id_result.has_value()) {
+          continue;
+        }
+        source_snapshot_id = source_snapshot_id_result.value();
+      }
+      // If this commit was cherry-picked from a still-live snapshot, skip it.
+      if (ancestor_ids.contains(source_snapshot_id) ||
+          picked_ancestor_snapshot_ids.contains(source_snapshot_id)) {
+        continue;
+      }
+
+      SnapshotCache snapshot_cache(snapshot.get());
+      auto manifests_result = snapshot_cache.Manifests(file_io_);
+      if (!manifests_result.has_value()) {
+        continue;
+      }
+
+      auto manifests = std::move(manifests_result).value();
+      for (auto& manifest : manifests) {
+        if (valid_manifests.contains(manifest.manifest_path)) continue;
+        manifests_to_delete.insert(manifest.manifest_path);
+
+        int64_t writer_id = manifest.added_snapshot_id;
+        bool is_from_ancestor = ancestor_ids.contains(writer_id);
+        bool is_from_expiring_snapshot = 
expired_snapshot_ids.contains(writer_id);
+
+        if (is_from_ancestor && manifest.has_deleted_files()) {
+          manifests_to_scan.insert(std::move(manifest));
+        } else if (!is_from_ancestor && is_from_expiring_snapshot &&
+                   manifest.has_added_files()) {
+          // The writer must be known-expired so missing history cannot make
+          // an ancestor look like a reverted snapshot.
+          manifests_to_revert.insert(std::move(manifest));
+        }
+      }
+      if (!snapshot->manifest_list.empty()) {
+        manifest_lists_to_delete.insert(snapshot->manifest_list);
+      }
+    }
+
+    // Deleting data files
+    if (level == CleanupLevel::kAll) {
+      // Manifests may reference partition specs that were pruned during 
expiration
+      // when CleanExpiredMetadata is enabled, so resolve schemas/specs 
against the
+      // pre-expiration metadata.
+      auto files_to_delete = FindFilesToDelete(
+          metadata_before_expiration, manifests_to_scan, manifests_to_revert, 
valid_ids);
+      DeleteFiles(files_to_delete);
+    }
+
+    // Deleting manifest files
+    DeleteFiles(manifests_to_delete);
+
+    // Deleting manifest-list files
+    DeleteFiles(manifest_lists_to_delete);
+
+    // Deleting statistics files
+    if (HasAnyStatisticsFiles(metadata_before_expiration)) {
+      DeleteFiles(
+          StatisticsFilesToDelete(metadata_before_expiration, 
metadata_after_expiration));
+    }
+
+    return {};
+  }
+
+ private:
+  /// \brief Resolve the data files that the incremental pass identified for 
deletion.
+  ///
+  /// For manifests_to_scan, read DELETED entries whose snapshot id is no 
longer valid.
+  /// For manifests_to_revert, read every ADDED entry.
+  std::unordered_set<std::string> FindFilesToDelete(
+      const TableMetadata& metadata,
+      const std::unordered_set<ManifestFile>& manifests_to_scan,
+      const std::unordered_set<ManifestFile>& manifests_to_revert,
+      const std::unordered_set<int64_t>& valid_ids) {
+    std::unordered_set<std::string> files_to_delete;
+
+    for (const auto& manifest : manifests_to_scan) {
+      auto reader_result = MakeManifestReader(manifest, file_io_, metadata);
+      if (!reader_result.has_value()) continue;
+      auto entries_result = reader_result.value()->Entries();
+      if (!entries_result.has_value()) continue;
+      for (const auto& entry : entries_result.value()) {
+        if (entry.status == ManifestStatus::kDeleted && 
entry.snapshot_id.has_value() &&
+            !valid_ids.contains(entry.snapshot_id.value()) && entry.data_file) 
{
+          files_to_delete.insert(entry.data_file->file_path);
+        }
+      }
+    }
+
+    for (const auto& manifest : manifests_to_revert) {
+      auto reader_result = MakeManifestReader(manifest, file_io_, metadata);
+      if (!reader_result.has_value()) continue;
+      auto entries_result = reader_result.value()->Entries();
+      if (!entries_result.has_value()) continue;
+      for (const auto& entry : entries_result.value()) {
+        if (entry.status == ManifestStatus::kAdded && entry.data_file) {
+          files_to_delete.insert(entry.data_file->file_path);
+        }
+      }
+    }
+
+    return files_to_delete;
+  }
+};
+
+/// \brief True if any retained snapshot sits outside the current main 
ancestry.
+bool HasNonMainSnapshots(const TableMetadata& metadata) {
+  auto current_result = metadata.SnapshotById(metadata.current_snapshot_id);
+  if (!current_result.has_value() || current_result.value() == nullptr) {
+    return !metadata.snapshots.empty();
+  }
+  auto ancestors_result = SnapshotUtil::AncestorsOf(
+      current_result.value()->snapshot_id,
+      [&metadata](int64_t id) { return metadata.SnapshotById(id); });
+  if (!ancestors_result.has_value()) {
+    return true;
+  }
+  std::unordered_set<int64_t> main_ancestors;
+  for (const auto& a : ancestors_result.value()) {
+    if (a) main_ancestors.insert(a->snapshot_id);
+  }
+  for (const auto& snapshot : metadata.snapshots) {
+    if (snapshot && !main_ancestors.contains(snapshot->snapshot_id)) {
+      return true;
+    }
+  }
+  return false;
+}
+
+/// \brief True if any expired snapshot lived outside the current main 
ancestry.
+///
+/// When `before` has no current snapshot, the main-ancestor set is empty; any
+/// removed snapshot then counts as "non-main" and returns true. This guards 
the
+/// dispatch in Finalize() against picking incremental cleanup when the 
before-state
+/// has snapshots but no current pointer.
+bool HasRemovedNonMainAncestors(const TableMetadata& before, const 
TableMetadata& after) {
+  std::unordered_set<int64_t> main_ancestors;
+  auto current_result = before.SnapshotById(before.current_snapshot_id);
+  if (current_result.has_value() && current_result.value() != nullptr) {
+    auto ancestors_result = SnapshotUtil::AncestorsOf(
+        current_result.value()->snapshot_id,
+        [&before](int64_t id) { return before.SnapshotById(id); });
+    if (!ancestors_result.has_value()) {
+      return true;
+    }
+    for (const auto& a : ancestors_result.value()) {
+      if (a) main_ancestors.insert(a->snapshot_id);
+    }
+  }
+  std::unordered_set<int64_t> after_ids;
+  after_ids.reserve(after.snapshots.size());
+  for (const auto& s : after.snapshots) {
+    if (s) after_ids.insert(s->snapshot_id);
+  }
+  for (const auto& snapshot : before.snapshots) {
+    if (!snapshot) continue;
+    bool removed = !after_ids.contains(snapshot->snapshot_id);
+    bool in_main = main_ancestors.contains(snapshot->snapshot_id);
+    if (removed && !in_main) {
+      return true;
+    }
+  }
+  return false;
+}
+
 }  // namespace
 
 Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make(
@@ -464,7 +778,7 @@ Result<std::unordered_set<int64_t>> 
ExpireSnapshots::UnreferencedSnapshotIdsToRe
   for (const auto& snapshot : base().snapshots) {
     ICEBERG_DCHECK(snapshot != nullptr, "Snapshot is null");
     if (!referenced_ids.contains(snapshot->snapshot_id) &&
-        snapshot->timestamp_ms > default_expire_older_than_) {
+        snapshot->timestamp_ms >= default_expire_older_than_) {
       // unreferenced and not old enough to be expired
       ids_to_retain.insert(snapshot->snapshot_id);
     }
@@ -528,6 +842,8 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
     ICEBERG_PRECHECK(!retained_id_to_refs.contains(id),
                      "Cannot expire {}. Still referenced by refs", id);
   }
+  std::unordered_set<int64_t> 
explicit_snapshot_ids(snapshot_ids_to_expire_.begin(),
+                                                    
snapshot_ids_to_expire_.end());
   ICEBERG_ASSIGN_OR_RAISE(auto all_branch_snapshot_ids,
                           ComputeAllBranchSnapshotIdsToRetain(retained_refs));
   ICEBERG_ASSIGN_OR_RAISE(auto unreferenced_snapshot_ids,
@@ -544,8 +860,10 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
       result.refs_to_remove.push_back(key_to_ref.first);
     }
   });
-  std::ranges::for_each(base.snapshots, [&ids_to_retain, &result](const auto& 
snapshot) {
-    if (snapshot && !ids_to_retain.contains(snapshot->snapshot_id)) {
+  std::ranges::for_each(base.snapshots, [&explicit_snapshot_ids, 
&ids_to_retain,
+                                         &result](const auto& snapshot) {
+    if (snapshot && (explicit_snapshot_ids.contains(snapshot->snapshot_id) ||
+                     !ids_to_retain.contains(snapshot->snapshot_id))) {
       result.snapshot_ids_to_remove.push_back(snapshot->snapshot_id);
     }
   });
@@ -608,16 +926,24 @@ Status ExpireSnapshots::Finalize(Result<const 
TableMetadata*> commit_result) {
   auto metadata_before_expiration_ptr = 
apply_result_->metadata_before_expiration;
   const TableMetadata& metadata_before_expiration = 
*metadata_before_expiration_ptr;
   const TableMetadata& metadata_after_expiration = *commit_result.value();
-  std::unordered_set<int64_t> 
expired_ids(apply_result_->snapshot_ids_to_remove.begin(),
-                                          
apply_result_->snapshot_ids_to_remove.end());
   apply_result_.reset();
 
-  // File cleanup is best-effort: log and continue on individual file deletion 
failures
-  ReachableFileCleanup strategy(ctx_->table->io(), delete_func_);
-  return strategy.CleanFiles(metadata_before_expiration, 
metadata_after_expiration,
-                             expired_ids, cleanup_level_);
+  // Pick incremental cleanup when the expiration is a simple linear-ancestry 
walk:
+  // no explicit snapshot IDs, no removed snapshots outside main ancestry, and 
no
+  // retained snapshots outside main ancestry.
+  const bool can_use_incremental =
+      !specified_snapshot_id_ &&
+      !HasRemovedNonMainAncestors(metadata_before_expiration,
+                                  metadata_after_expiration) &&
+      !HasNonMainSnapshots(metadata_after_expiration);
+
+  if (can_use_incremental) {
+    return IncrementalFileCleanup(ctx_->table->io(), delete_func_)
+        .CleanFiles(metadata_before_expiration, metadata_after_expiration,
+                    cleanup_level_);
+  }
+  return ReachableFileCleanup(ctx_->table->io(), delete_func_)
+      .CleanFiles(metadata_before_expiration, metadata_after_expiration, 
cleanup_level_);
 }
 
-// TODO(shangxinli): add IncrementalFileCleanup strategy for linear ancestry 
optimization.
-
 }  // namespace iceberg
diff --git a/src/iceberg/update/expire_snapshots.h 
b/src/iceberg/update/expire_snapshots.h
index 7c1588aa..a5b6e3b3 100644
--- a/src/iceberg/update/expire_snapshots.h
+++ b/src/iceberg/update/expire_snapshots.h
@@ -22,6 +22,7 @@
 #include <cstdint>
 #include <functional>
 #include <memory>
+#include <optional>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>


Reply via email to