wgtmac commented on code in PR #590:
URL: https://github.com/apache/iceberg-cpp/pull/590#discussion_r2963633083


##########
src/iceberg/table_scan.cc:
##########
@@ -539,17 +641,75 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> 
DataTableScan::PlanFiles() co
 // IncrementalAppendScan implementation
 
 Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
-    [[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
-    [[maybe_unused]] std::shared_ptr<Schema> schema,
-    [[maybe_unused]] std::shared_ptr<FileIO> io,
-    [[maybe_unused]] internal::TableScanContext context) {
-  return NotImplemented("IncrementalAppendScan is not implemented");
+    std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+    std::shared_ptr<FileIO> io, internal::TableScanContext context) {
+  ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+  ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+  ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+  return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan(
+      std::move(metadata), std::move(schema), std::move(io), 
std::move(context)));
 }
 
 Result<std::vector<std::shared_ptr<FileScanTask>>> 
IncrementalAppendScan::PlanFiles(
     std::optional<int64_t> from_snapshot_id_exclusive,
     int64_t to_snapshot_id_inclusive) const {
-  return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors_snapshots,
+      SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+                                     from_snapshot_id_exclusive));
+
+  std::vector<std::shared_ptr<Snapshot>> append_snapshots;
+  std::ranges::copy_if(ancestors_snapshots, 
std::back_inserter(append_snapshots),
+                       [](const auto& snapshot) {
+                         return snapshot != nullptr &&
+                                snapshot->Operation().has_value() &&
+                                snapshot->Operation().value() == 
DataOperation::kAppend;
+                       });
+  if (append_snapshots.empty()) {
+    return std::vector<std::shared_ptr<FileScanTask>>{};
+  }
+
+  std::unordered_set<int64_t> snapshot_ids;
+  std::ranges::transform(append_snapshots,
+                         std::inserter(snapshot_ids, snapshot_ids.end()),
+                         [](const auto& snapshot) { return 
snapshot->snapshot_id; });
+
+  std::vector<ManifestFile> data_manifests;
+  for (const auto& snapshot : append_snapshots) {
+    SnapshotCache snapshot_cache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_));
+    std::ranges::copy_if(manifests, std::back_inserter(data_manifests),

Review Comment:
   In Java's `BaseIncrementalAppendScan.java`, manifests from append snapshots 
are collected into a `Set<ManifestFile>` to prevent duplicate processing if 
multiple snapshots reference the same manifest. The C++ implementation pushes 
directly into `std::vector<ManifestFile> data_manifests`, which will cause 
duplicate processing if a manifest is retained across multiple append snapshots 
since `ManifestGroup::Make` does not deduplicate them internally.
   
   **Suggestion:** Deduplicate `data_manifests` (e.g., by tracking seen 
`manifest_path`s using a `std::unordered_set<std::string>`) before passing them 
to `ManifestGroup::Make`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to