wgtmac commented on code in PR #592:
URL: https://github.com/apache/iceberg-cpp/pull/592#discussion_r3099347039


##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -37,6 +43,264 @@
 
 namespace iceberg {
 
+namespace {
+
+Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
+    const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
+    const TableMetadata& metadata) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
metadata.PartitionSpecById(manifest.partition_spec_id));
+  return ManifestReader::Make(manifest, file_io, std::move(schema), 
std::move(spec));
+}
+
+/// \brief Abstract strategy for cleaning up files after snapshot expiration.
+///
+/// Mirrors Java's FileCleanupStrategy: provides shared delete utilities while
+/// allowing different cleanup algorithms (ReachableFileCleanup, 
IncrementalFileCleanup).
+class FileCleanupStrategy {
+ public:
+  FileCleanupStrategy(std::shared_ptr<FileIO> file_io,
+                      std::function<void(const std::string&)> delete_func)
+      : file_io_(std::move(file_io)), delete_func_(std::move(delete_func)) {}
+
+  virtual ~FileCleanupStrategy() = default;
+
+  /// \brief Clean up files that are only reachable by expired snapshots.
+  ///
+  /// \param metadata Table metadata before expiration (contains all 
snapshots).
+  /// \param expired_snapshot_ids Snapshot IDs that were expired during this 
operation.
+  /// \param level Controls which types of files are eligible for deletion.
+  virtual Status CleanFiles(const TableMetadata& metadata,
+                            const std::unordered_set<int64_t>& 
expired_snapshot_ids,
+                            CleanupLevel level) = 0;
+
+ protected:
+  /// \brief Delete a file, suppressing errors (best-effort).
+  ///
+  /// Uses the custom delete function if set, otherwise FileIO::DeleteFile.
+  /// Matches Java's suppressFailureWhenFinished behavior.
+  void DeleteFile(const std::string& path) {
+    try {
+      if (delete_func_) {
+        delete_func_(path);
+      } else {
+        std::ignore = file_io_->DeleteFile(path);
+      }
+    } catch (...) {
+      // Suppress all exceptions during file cleanup to match Java's
+      // suppressFailureWhenFinished behavior.
+    }
+  }
+
+  /// \brief Returns paths of statistics files referenced only by expired 
snapshots.
+  ///
+  /// Uses path-based set difference (matching Java's 
expiredStatisticsFilesLocations):
+  /// if the same file path is shared across snapshots, it is only deleted when
+  /// no retained snapshot references it.
+  std::unordered_set<std::string> ExpiredStatisticsFilePaths(
+      const TableMetadata& metadata, const std::unordered_set<int64_t>& 
expired_ids) {
+    std::unordered_set<std::string> retained_paths;
+    for (const auto& stat : metadata.statistics) {
+      if (stat && !expired_ids.contains(stat->snapshot_id)) {
+        retained_paths.insert(stat->path);
+      }
+    }
+    for (const auto& part_stat : metadata.partition_statistics) {
+      if (part_stat && !expired_ids.contains(part_stat->snapshot_id)) {
+        retained_paths.insert(part_stat->path);
+      }
+    }
+
+    std::unordered_set<std::string> expired_paths;
+    for (const auto& stat : metadata.statistics) {
+      if (stat && expired_ids.contains(stat->snapshot_id) &&
+          !retained_paths.contains(stat->path)) {
+        expired_paths.insert(stat->path);
+      }
+    }
+    for (const auto& part_stat : metadata.partition_statistics) {
+      if (part_stat && expired_ids.contains(part_stat->snapshot_id) &&
+          !retained_paths.contains(part_stat->path)) {
+        expired_paths.insert(part_stat->path);
+      }
+    }
+    return expired_paths;
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::function<void(const std::string&)> delete_func_;
+};
+
+/// \brief File cleanup strategy that determines safe deletions via full 
reachability.
+///
+/// Mirrors Java's ReachableFileCleanup: collects manifests from all expired 
and
+/// retained snapshots, prunes candidates still referenced by retained 
snapshots,
+/// then deletes orphaned manifests, data files, manifest lists, and 
statistics files.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion 
support.
+class ReachableFileCleanup : public FileCleanupStrategy {
+ public:
+  using FileCleanupStrategy::FileCleanupStrategy;
+
+  Status CleanFiles(const TableMetadata& metadata,
+                    const std::unordered_set<int64_t>& expired_snapshot_ids,
+                    CleanupLevel level) override {
+    std::unordered_set<int64_t> retained_snapshot_ids;
+    for (const auto& snapshot : metadata.snapshots) {
+      if (snapshot && !expired_snapshot_ids.contains(snapshot->snapshot_id)) {
+        retained_snapshot_ids.insert(snapshot->snapshot_id);
+      }
+    }
+
+    // Phase 1: Collect manifest paths from expired and retained snapshots.
+    // The manifest_cache_ is populated here to avoid O(M*S) repeated I/O in
+    // FindDataFilesToDelete.
+    std::unordered_set<std::string> expired_manifest_paths;
+    for (int64_t snapshot_id : expired_snapshot_ids) {
+      ReadManifestsForSnapshot(metadata, snapshot_id, expired_manifest_paths);
+    }
+    std::unordered_set<std::string> retained_manifest_paths;
+    for (int64_t snapshot_id : retained_snapshot_ids) {
+      ReadManifestsForSnapshot(metadata, snapshot_id, retained_manifest_paths);
+    }
+
+    // Phase 2: Prune manifests still referenced by retained snapshots.
+    std::unordered_set<std::string> manifests_to_delete;
+    for (const auto& path : expired_manifest_paths) {
+      if (!retained_manifest_paths.contains(path)) {
+        manifests_to_delete.insert(path);
+      }
+    }
+
+    // Phase 3: Delete data files if cleanup level is kAll.
+    if (level == CleanupLevel::kAll && !manifests_to_delete.empty()) {
+      auto data_files_result =
+          FindDataFilesToDelete(metadata, manifests_to_delete, 
retained_manifest_paths);
+      if (data_files_result.has_value()) {
+        for (const auto& path : data_files_result.value()) {
+          DeleteFile(path);
+        }
+      }
+    }
+
+    // Phase 4: Delete orphaned manifest files.
+    for (const auto& path : manifests_to_delete) {
+      DeleteFile(path);
+    }
+
+    // Phase 5: Delete manifest lists from expired snapshots.
+    for (int64_t snapshot_id : expired_snapshot_ids) {
+      auto snapshot_result = metadata.SnapshotById(snapshot_id);
+      if (!snapshot_result.has_value()) continue;
+      const auto& snapshot = snapshot_result.value();
+      if (!snapshot->manifest_list.empty()) {
+        DeleteFile(snapshot->manifest_list);
+      }
+    }
+
+    // Phase 6: Delete expired statistics files using path-based set 
difference.

Review Comment:
   [P1] Stats files are deleted even though snapshot removal still leaves their 
metadata entries behind. This PR now deletes expired statistics and 
partition-statistics files in `Finalize`, but C++ snapshot removal still does 
not remove the corresponding `statistics` / `partition_statistics` metadata 
entries (`src/iceberg/table_metadata.cc` still has a FIXME instead of calling 
`RemoveStatistics` / `RemovePartitionStatistics`). That means a table can 
commit metadata that still references a stats file and then immediately delete 
the file on disk. Java removes those metadata entries as part of snapshot 
removal and has regression coverage for expiration and reuse of stats files. 
Please either wire snapshot removal to drop the metadata entries first or defer 
physical stats-file deletion until that is implemented.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -37,6 +43,264 @@
 
 namespace iceberg {
 
+namespace {
+
+Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
+    const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
+    const TableMetadata& metadata) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
metadata.PartitionSpecById(manifest.partition_spec_id));
+  return ManifestReader::Make(manifest, file_io, std::move(schema), 
std::move(spec));
+}
+
+/// \brief Abstract strategy for cleaning up files after snapshot expiration.
+///
+/// Mirrors Java's FileCleanupStrategy: provides shared delete utilities while
+/// allowing different cleanup algorithms (ReachableFileCleanup, 
IncrementalFileCleanup).
+class FileCleanupStrategy {
+ public:
+  FileCleanupStrategy(std::shared_ptr<FileIO> file_io,
+                      std::function<void(const std::string&)> delete_func)
+      : file_io_(std::move(file_io)), delete_func_(std::move(delete_func)) {}
+
+  virtual ~FileCleanupStrategy() = default;
+
+  /// \brief Clean up files that are only reachable by expired snapshots.
+  ///
+  /// \param metadata Table metadata before expiration (contains all 
snapshots).
+  /// \param expired_snapshot_ids Snapshot IDs that were expired during this 
operation.
+  /// \param level Controls which types of files are eligible for deletion.
+  virtual Status CleanFiles(const TableMetadata& metadata,
+                            const std::unordered_set<int64_t>& 
expired_snapshot_ids,
+                            CleanupLevel level) = 0;
+
+ protected:
+  /// \brief Delete a file, suppressing errors (best-effort).
+  ///
+  /// Uses the custom delete function if set, otherwise FileIO::DeleteFile.
+  /// Matches Java's suppressFailureWhenFinished behavior.
+  void DeleteFile(const std::string& path) {
+    try {
+      if (delete_func_) {
+        delete_func_(path);
+      } else {
+        std::ignore = file_io_->DeleteFile(path);
+      }
+    } catch (...) {
+      // Suppress all exceptions during file cleanup to match Java's
+      // suppressFailureWhenFinished behavior.
+    }
+  }
+
+  /// \brief Returns paths of statistics files referenced only by expired 
snapshots.
+  ///
+  /// Uses path-based set difference (matching Java's 
expiredStatisticsFilesLocations):
+  /// if the same file path is shared across snapshots, it is only deleted when
+  /// no retained snapshot references it.
+  std::unordered_set<std::string> ExpiredStatisticsFilePaths(
+      const TableMetadata& metadata, const std::unordered_set<int64_t>& 
expired_ids) {
+    std::unordered_set<std::string> retained_paths;
+    for (const auto& stat : metadata.statistics) {
+      if (stat && !expired_ids.contains(stat->snapshot_id)) {
+        retained_paths.insert(stat->path);
+      }
+    }
+    for (const auto& part_stat : metadata.partition_statistics) {
+      if (part_stat && !expired_ids.contains(part_stat->snapshot_id)) {
+        retained_paths.insert(part_stat->path);
+      }
+    }
+
+    std::unordered_set<std::string> expired_paths;
+    for (const auto& stat : metadata.statistics) {
+      if (stat && expired_ids.contains(stat->snapshot_id) &&
+          !retained_paths.contains(stat->path)) {
+        expired_paths.insert(stat->path);
+      }
+    }
+    for (const auto& part_stat : metadata.partition_statistics) {
+      if (part_stat && expired_ids.contains(part_stat->snapshot_id) &&
+          !retained_paths.contains(part_stat->path)) {
+        expired_paths.insert(part_stat->path);
+      }
+    }
+    return expired_paths;
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::function<void(const std::string&)> delete_func_;
+};
+
+/// \brief File cleanup strategy that determines safe deletions via full 
reachability.
+///
+/// Mirrors Java's ReachableFileCleanup: collects manifests from all expired 
and
+/// retained snapshots, prunes candidates still referenced by retained 
snapshots,
+/// then deletes orphaned manifests, data files, manifest lists, and 
statistics files.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion 
support.
+class ReachableFileCleanup : public FileCleanupStrategy {
+ public:
+  using FileCleanupStrategy::FileCleanupStrategy;
+
+  Status CleanFiles(const TableMetadata& metadata,
+                    const std::unordered_set<int64_t>& expired_snapshot_ids,
+                    CleanupLevel level) override {
+    std::unordered_set<int64_t> retained_snapshot_ids;
+    for (const auto& snapshot : metadata.snapshots) {
+      if (snapshot && !expired_snapshot_ids.contains(snapshot->snapshot_id)) {
+        retained_snapshot_ids.insert(snapshot->snapshot_id);
+      }
+    }
+
+    // Phase 1: Collect manifest paths from expired and retained snapshots.
+    // The manifest_cache_ is populated here to avoid O(M*S) repeated I/O in
+    // FindDataFilesToDelete.
+    std::unordered_set<std::string> expired_manifest_paths;
+    for (int64_t snapshot_id : expired_snapshot_ids) {
+      ReadManifestsForSnapshot(metadata, snapshot_id, expired_manifest_paths);
+    }
+    std::unordered_set<std::string> retained_manifest_paths;
+    for (int64_t snapshot_id : retained_snapshot_ids) {
+      ReadManifestsForSnapshot(metadata, snapshot_id, retained_manifest_paths);
+    }
+
+    // Phase 2: Prune manifests still referenced by retained snapshots.
+    std::unordered_set<std::string> manifests_to_delete;
+    for (const auto& path : expired_manifest_paths) {
+      if (!retained_manifest_paths.contains(path)) {
+        manifests_to_delete.insert(path);
+      }
+    }
+
+    // Phase 3: Delete data files if cleanup level is kAll.
+    if (level == CleanupLevel::kAll && !manifests_to_delete.empty()) {
+      auto data_files_result =
+          FindDataFilesToDelete(metadata, manifests_to_delete, 
retained_manifest_paths);
+      if (data_files_result.has_value()) {
+        for (const auto& path : data_files_result.value()) {
+          DeleteFile(path);
+        }
+      }
+    }
+
+    // Phase 4: Delete orphaned manifest files.
+    for (const auto& path : manifests_to_delete) {
+      DeleteFile(path);
+    }
+
+    // Phase 5: Delete manifest lists from expired snapshots.
+    for (int64_t snapshot_id : expired_snapshot_ids) {
+      auto snapshot_result = metadata.SnapshotById(snapshot_id);
+      if (!snapshot_result.has_value()) continue;
+      const auto& snapshot = snapshot_result.value();
+      if (!snapshot->manifest_list.empty()) {
+        DeleteFile(snapshot->manifest_list);
+      }
+    }
+
+    // Phase 6: Delete expired statistics files using path-based set 
difference.
+    for (const auto& path : ExpiredStatisticsFilePaths(metadata, 
expired_snapshot_ids)) {
+      DeleteFile(path);
+    }
+
+    return {};
+  }
+
+ private:
+  /// Cache of manifest path -> ManifestFile, populated during Phase 1 to avoid
+  /// re-reading manifest lists in FindDataFilesToDelete.
+  std::unordered_map<std::string, ManifestFile> manifest_cache_;
+
+  /// \brief Collect manifest paths for a snapshot into manifest_paths.
+  ///
+  /// Best-effort: if the snapshot or its manifest list cannot be read, the 
error

Review Comment:
   [P0] Retained-manifest read failures can delete live manifests. 
`ReadManifestsForSnapshot` silently returns when a retained snapshot's manifest 
list cannot be read, but phase 2 still computes `manifests_to_delete` from that 
incomplete retained set and phase 4 deletes those manifests. That means a 
transient read failure while enumerating the current table state can delete 
manifest files that are still referenced by live snapshots. Java's 
`ReachableFileCleanup` aborts cleanup on this path (`throwFailureWhenFinished`) 
specifically to avoid deleting live metadata, so we should fail or skip 
manifest deletion entirely when retained manifests cannot be enumerated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to