wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1759573665
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -63,33 +60,43 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
return CloseableIterable.empty();
}
- Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+ Map<Long, Integer> snapshotOrdinals =
computeSnapshotOrdinals(changelogSnapshots);
- Set<ManifestFile> newDataManifests =
- FluentIterable.from(changelogSnapshots)
- .transformAndConcat(snapshot ->
snapshot.dataManifests(table().io()))
- .filter(manifest ->
changelogSnapshotIds.contains(manifest.snapshotId()))
- .toSet();
-
- ManifestGroup manifestGroup =
- new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
- .specsById(table().specs())
- .caseSensitive(isCaseSensitive())
- .select(scanColumns())
- .filterData(filter())
- .filterManifestEntries(entry ->
changelogSnapshotIds.contains(entry.snapshotId()))
- .ignoreExisting()
- .columnsToKeepStats(columnsToKeepStats());
-
- if (shouldIgnoreResiduals()) {
- manifestGroup = manifestGroup.ignoreResiduals();
- }
-
- if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
- manifestGroup = manifestGroup.planWith(planExecutor());
- }
+ // map of delete file to the snapshot where the delete file is added
+ // the delete file is keyed by its path, and the snapshot is represented
by the snapshot ordinal
+ Map<String, Integer> deleteFileToSnapshotOrdinal =
+ computeDeleteFileToSnapshotOrdinal(changelogSnapshots,
snapshotOrdinals);
- return manifestGroup.plan(new
CreateDataFileChangeTasks(changelogSnapshots));
+ Iterable<CloseableIterable<ChangelogScanTask>> plans =
+ FluentIterable.from(changelogSnapshots)
+ .transform(
+ snapshot -> {
+ List<ManifestFile> dataManifests =
snapshot.dataManifests(table().io());
+ List<ManifestFile> deleteManifests =
snapshot.deleteManifests(table().io());
+
+ ManifestGroup manifestGroup =
+ new ManifestGroup(table().io(), dataManifests,
deleteManifests)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .select(scanColumns())
+ .filterData(filter())
+ .columnsToKeepStats(columnsToKeepStats());
+
+ if (shouldIgnoreResiduals()) {
+ manifestGroup = manifestGroup.ignoreResiduals();
+ }
+
+ if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {
Review Comment:
I see where you're coming from.
Before this change, there is only one call to `ManifestGroup:plan`. The
`ManifestGroup` contains manifests from all the snapshots in the range of
interest, only manifest entries added in one of these snapshots are considered
and manifests with only existing entries are ignored; then one call to `plan`
generates the scan tasks. This approach only works for the copy-on-write case
where there are no delete files; in the copy-on-write case, when deletes occur,
new data files are always added and so will show up in the `ManifestGroup`.
When we have delete files, a situation as you describe can easily arise, where
there is no new data file added in a snapshot, only new delete files. That is
why we need a different approach.
With this change, there is a `ManifestGroup` created for each snapshot in
the range of interest and a call to `ManifestGroup:plan` for it; then the
`CloseableIterable` of scan tasks for all of these are concatenated into one
`CloseableIterable`. We no longer ignore data manifests with existing entries;
in fact, we have to consider them. In a snapshot with no added data files, just
added delete files, we consider the existing data files and see if any deletes
apply to them.
Now, each call to `ManifestGroup:plan` is still only parallelizable
depending on the number of data manifest files in that `ManifestGroup`. There
is additional parallelism possible, but I'm not aware that we have the API for
it, which is since we are making multiple `ManifestGroup:plan` calls (but on
different `ManifestGroup` instances), to distribute each unit of work from
every call to one executor service. That is not so easy to do with the API we
have, I think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]