wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1763998376
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,51 +128,124 @@ private static Map<Long, Integer>
computeSnapshotOrdinals(Deque<Snapshot> snapsh
return snapshotOrdinals;
}
- private static class CreateDataFileChangeTasks implements
CreateTasksFunction<ChangelogScanTask> {
- private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+ private static class DummyChangelogScanTask implements ChangelogScanTask {
+ public static final DummyChangelogScanTask INSTANCE = new
DummyChangelogScanTask();
- private final Map<Long, Integer> snapshotOrdinals;
+ private DummyChangelogScanTask() {}
- CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
- this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+ @Override
+ public ChangelogOperation operation() {
+ return ChangelogOperation.DELETE;
+ }
+
+ @Override
+ public int changeOrdinal() {
+ return 0;
+ }
+
+ @Override
+ public long commitSnapshotId() {
+ return 0L;
+ }
+ }
+
+ private static class CreateDataFileChangeTasks implements
CreateTasksFunction<ChangelogScanTask> {
+ private final long snapshotId;
+ private final long sequenceNumber;
+ private final int changeOrdinal;
+
+ CreateDataFileChangeTasks(long snapshotId, long sequenceNumber, int
changeOrdinal) {
+ this.snapshotId = snapshotId;
+ this.sequenceNumber = sequenceNumber;
+ this.changeOrdinal = changeOrdinal;
}
@Override
public CloseableIterable<ChangelogScanTask> apply(
CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext
context) {
- return CloseableIterable.transform(
- entries,
- entry -> {
- long commitSnapshotId = entry.snapshotId();
- int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
- DataFile dataFile = entry.file().copy(context.shouldKeepStats());
-
- switch (entry.status()) {
- case ADDED:
- return new BaseAddedRowsScanTask(
- changeOrdinal,
- commitSnapshotId,
- dataFile,
- NO_DELETES,
- context.schemaAsString(),
- context.specAsString(),
- context.residuals());
-
- case DELETED:
- return new BaseDeletedDataFileScanTask(
- changeOrdinal,
- commitSnapshotId,
- dataFile,
- NO_DELETES,
- context.schemaAsString(),
- context.specAsString(),
- context.residuals());
-
- default:
- throw new IllegalArgumentException("Unexpected entry status: "
+ entry.status());
- }
- });
+ CloseableIterable<ChangelogScanTask> tasks =
+ CloseableIterable.transform(
+ entries,
+ entry -> {
+ long entrySnapshotId = entry.snapshotId();
+ DataFile dataFile =
entry.file().copy(context.shouldKeepStats());
+ DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
+ List<DeleteFile> added = Lists.newArrayList();
+ List<DeleteFile> existing = Lists.newArrayList();
+ for (DeleteFile deleteFile : deleteFiles) {
+ if (sequenceNumber == deleteFile.dataSequenceNumber()) {
+ added.add(deleteFile);
+ } else {
+ existing.add(deleteFile);
+ }
+ }
+ DeleteFile[] addedDeleteFiles = added.toArray(new
DeleteFile[0]);
+ DeleteFile[] existingDeleteFiles = existing.toArray(new
DeleteFile[0]);
Review Comment:
Previously, I used a map from delete files to the snapshot where it is
added, to determine the `addedDeleteFiles` and `existingDeleteFiles`. However,
this map is computed for the snapshots in the range of interest (for the
`BaseIncrementalChangelogScan`), and so there may not be an entry in the map
for a delete file if it is added in a snapshot before this range. This causes
issues in the `FluentIterable::filter(Predicate)` I was using to filter the
delete files. This approach is simpler, and turns out to be what @manuzhang
used his https://github.com/apache/iceberg/pull/9888. Hat tip to Manu.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]