shangxinli commented on code in PR #592:
URL: https://github.com/apache/iceberg-cpp/pull/592#discussion_r3035741060


##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,250 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;
+    }
+  } catch (...) {
+    // Suppress all exceptions during file cleanup to match Java's
+    // suppressFailureWhenFinished behavior.
+  }
+}
+
+Result<std::shared_ptr<ManifestReader>> ExpireSnapshots::MakeManifestReader(
+    const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io) {
+  const TableMetadata& metadata = base();
+  auto schema_result = metadata.Schema();
+  if (!schema_result.has_value()) return 
std::unexpected<Error>(schema_result.error());
+  auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
+  if (!spec_result.has_value()) return 
std::unexpected<Error>(spec_result.error());
+  return ManifestReader::Make(manifest, file_io, schema_result.value(),
+                              spec_result.value());
+}
+
+Status ExpireSnapshots::ReadManifestsForSnapshot(
+    int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  auto snapshot_result = metadata.SnapshotById(snapshot_id);
+  if (!snapshot_result.has_value()) {
+    return {};
+  }
+  auto& snapshot = snapshot_result.value();
+
+  SnapshotCache snapshot_cache(snapshot.get());
+  auto manifests_result = snapshot_cache.Manifests(file_io);
+  if (!manifests_result.has_value()) {
+    // Best-effort: skip this snapshot if we can't read its manifests
+    return {};
+  }
+
+  for (const auto& manifest : manifests_result.value()) {
+    manifest_paths.insert(manifest.manifest_path);
+    // Cache manifest metadata for later use in FindDataFilesToDelete,
+    // avoiding O(M*S) repeated I/O from re-reading manifest lists.
+    manifest_cache_.emplace(manifest.manifest_path, manifest);
+  }
+
+  return {};
+}
+
+Result<std::unordered_set<std::string>> ExpireSnapshots::FindDataFilesToDelete(
+    const std::unordered_set<std::string>& manifests_to_delete,
+    const std::unordered_set<std::string>& retained_manifests) {
+  auto file_io = ctx_->table->io();
+  std::unordered_set<std::string> data_files_to_delete;
+
+  // Step 1: Collect live file paths from manifests being deleted.
+  // Use LiveEntries() (ADDED/EXISTING only) to match Java's 
ManifestFiles.readPaths
+  // which delegates to liveEntries(). Using Entries() would include DELETED 
entries
+  // and could cause storage leaks.
+  for (const auto& [path, manifest] : manifest_cache_) {
+    if (!manifests_to_delete.contains(path)) continue;
+
+    auto reader_result = MakeManifestReader(manifest, file_io);
+    if (!reader_result.has_value()) continue;
+
+    auto entries_result = reader_result.value()->LiveEntries();
+    if (!entries_result.has_value()) continue;
+
+    for (const auto& entry : entries_result.value()) {
+      if (entry.data_file) {
+        data_files_to_delete.insert(entry.data_file->file_path);
+      }
+    }
+  }
+
+  if (data_files_to_delete.empty()) {
+    return data_files_to_delete;
+  }
+
+  // Step 2: Remove any files that are still referenced by retained manifests.
+  // If reading a retained manifest fails, we must NOT delete its data files
+  // to avoid accidental data loss (matching Java's retry + 
throwFailureWhenFinished).
+  for (const auto& manifest_path : retained_manifests) {
+    if (data_files_to_delete.empty()) break;
+
+    auto it = manifest_cache_.find(manifest_path);
+    if (it == manifest_cache_.end()) continue;
+
+    auto reader_result = MakeManifestReader(it->second, file_io);
+    if (!reader_result.has_value()) {
+      // Cannot read a retained manifest — abort data file deletion to prevent
+      // accidental data loss. Java retries and throws on failure here.
+      return std::unordered_set<std::string>{};
+    }
+
+    auto entries_result = reader_result.value()->LiveEntries();
+    if (!entries_result.has_value()) {
+      return std::unordered_set<std::string>{};
+    }
+
+    for (const auto& entry : entries_result.value()) {
+      if (entry.data_file) {
+        data_files_to_delete.erase(entry.data_file->file_path);
+      }
+    }
+  }
+
+  return data_files_to_delete;
+}
+
+Status ExpireSnapshots::CleanExpiredFiles(
+    const std::vector<int64_t>& expired_snapshot_ids) {
+  const TableMetadata& metadata = base();
+
+  // Build expired and retained snapshot ID sets.
+  // The retained set includes ALL snapshots referenced by any branch or tag,
+  // since Apply() already computed retention across all refs.
+  std::unordered_set<int64_t> expired_id_set(expired_snapshot_ids.begin(),
+                                             expired_snapshot_ids.end());
+  std::unordered_set<int64_t> retained_snapshot_ids;
+  for (const auto& snapshot : metadata.snapshots) {
+    if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) {
+      retained_snapshot_ids.insert(snapshot->snapshot_id);
+    }
+  }
+
+  // Phase 1: Collect manifest paths from expired and retained snapshots.
+  // TODO(shangxinli): Parallelize manifest collection with a thread pool.
+  std::unordered_set<std::string> expired_manifest_paths;
+  for (int64_t snapshot_id : expired_snapshot_ids) {
+    std::ignore = ReadManifestsForSnapshot(snapshot_id, 
expired_manifest_paths);

Review Comment:
    ReadManifestsForSnapshot now returns void since it is intentionally 
best-effort. For expired snapshots, a read failure means we might miss some 
file deletions, but orphaned files can be cleaned up by GC later. This is the 
conservative direction (never delete what we can't confirm is unreachable). For 
retained manifests in FindDataFilesToDelete, failures still abort data file 
deletion entirely to prevent accidental data loss, matching Java's 
throwFailureWhenFinished behavior there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to