wgtmac commented on code in PR #590:
URL: https://github.com/apache/iceberg-cpp/pull/590#discussion_r2963633083
##########
src/iceberg/table_scan.cc:
##########
@@ -539,17 +641,75 @@ Result<std::vector<std::shared_ptr<FileScanTask>>>
DataTableScan::PlanFiles() co
// IncrementalAppendScan implementation
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
- [[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
- [[maybe_unused]] std::shared_ptr<Schema> schema,
- [[maybe_unused]] std::shared_ptr<FileIO> io,
- [[maybe_unused]] internal::TableScanContext context) {
- return NotImplemented("IncrementalAppendScan is not implemented");
+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+ std::shared_ptr<FileIO> io, internal::TableScanContext context) {
+ ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+ ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+ return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan(
+ std::move(metadata), std::move(schema), std::move(io),
std::move(context)));
}
Result<std::vector<std::shared_ptr<FileScanTask>>>
IncrementalAppendScan::PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
- return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto ancestors_snapshots,
+ SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+ from_snapshot_id_exclusive));
+
+ std::vector<std::shared_ptr<Snapshot>> append_snapshots;
+ std::ranges::copy_if(ancestors_snapshots,
std::back_inserter(append_snapshots),
+ [](const auto& snapshot) {
+ return snapshot != nullptr &&
+ snapshot->Operation().has_value() &&
+ snapshot->Operation().value() ==
DataOperation::kAppend;
+ });
+ if (append_snapshots.empty()) {
+ return std::vector<std::shared_ptr<FileScanTask>>{};
+ }
+
+ std::unordered_set<int64_t> snapshot_ids;
+ std::ranges::transform(append_snapshots,
+ std::inserter(snapshot_ids, snapshot_ids.end()),
+ [](const auto& snapshot) { return
snapshot->snapshot_id; });
+
+ std::vector<ManifestFile> data_manifests;
+ for (const auto& snapshot : append_snapshots) {
+ SnapshotCache snapshot_cache(snapshot.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_));
+ std::ranges::copy_if(manifests, std::back_inserter(data_manifests),
Review Comment:
In Java's `BaseIncrementalAppendScan.java`, manifests from append snapshots
are collected into a `Set<ManifestFile>` to prevent duplicate processing if
multiple snapshots reference the same manifest. The C++ implementation pushes
directly into `std::vector<ManifestFile> data_manifests`, which will cause
duplicate processing if a manifest is retained across multiple append snapshots
since `ManifestGroup::Make` does not deduplicate them internally.
**Suggestion:** Deduplicate `data_manifests` (e.g., by tracking seen
`manifest_path`s using a `std::unordered_set<std::string>`) before passing them
to `ManifestGroup::Make`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]