pvary commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1727847483
########## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ########## @@ -133,51 +131,149 @@ private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapsh return snapshotOrdinals; } - private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> { - private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private Map<String, Integer> computeDeleteFileToSnapshotOrdinal( + Deque<Snapshot> snapshots, Map<Long, Integer> snapshotOrdinals) { + Map<String, Integer> deleteFileToSnapshotOrdinal = Maps.newHashMap(); + + for (Snapshot snapshot : snapshots) { + Iterable<DeleteFile> deleteFiles = snapshot.addedDeleteFiles(table().io()); + for (DeleteFile deleteFile : deleteFiles) { + deleteFileToSnapshotOrdinal.put( + deleteFile.path().toString(), snapshotOrdinals.get(snapshot.snapshotId())); + } + } + + return deleteFileToSnapshotOrdinal; + } + + private static class DummyChangelogScanTask implements ChangelogScanTask { + public static final DummyChangelogScanTask INSTANCE = new DummyChangelogScanTask(); + private DummyChangelogScanTask() {} + + @Override + public ChangelogOperation operation() { + return ChangelogOperation.DELETE; + } + + @Override + public int changeOrdinal() { + return 0; + } + + @Override + public long commitSnapshotId() { + return 0L; + } + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> { + private final long snapshotId; + private final int changeOrdinal; private final Map<Long, Integer> snapshotOrdinals; + private final Map<String, Integer> deleteFileToSnapshotOrdinal; + + CreateDataFileChangeTasks( + long snapshotId, + Map<Long, Integer> snapshotOrdinals, + Map<String, Integer> deleteFileToSnapshotOrdinal) { + this.snapshotId = snapshotId; + this.snapshotOrdinals = snapshotOrdinals; + this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal; + this.changeOrdinal = this.snapshotOrdinals.get(snapshotId); + } + + private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal) + .toArray(DeleteFile.class); + } - CreateDataFileChangeTasks(Deque<Snapshot> snapshots) { - this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); + private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) { + return FluentIterable.from(deleteFiles) + .filter( + deleteFile -> + deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal) + .toArray(DeleteFile.class); } @Override public CloseableIterable<ChangelogScanTask> apply( CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) { - return CloseableIterable.transform( - entries, - entry -> { - long commitSnapshotId = entry.snapshotId(); - int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); - DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - - switch (entry.status()) { - case ADDED: - return new BaseAddedRowsScanTask( - changeOrdinal, - commitSnapshotId, - dataFile, - NO_DELETES, - context.schemaAsString(), - context.specAsString(), - context.residuals()); - - case DELETED: - return new BaseDeletedDataFileScanTask( - changeOrdinal, - commitSnapshotId, - dataFile, - NO_DELETES, - context.schemaAsString(), - context.specAsString(), - context.residuals()); - - default: - throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); - } - }); + CloseableIterable<ChangelogScanTask> tasks = + CloseableIterable.transform( + entries, + entry -> { + long entrySnapshotId = entry.snapshotId(); + DataFile dataFile = entry.file().copy(context.shouldKeepStats()); + DeleteFile[] addedDeleteFiles = filterAdded(context.deletes().forEntry(entry)); + + switch (entry.status()) { + case ADDED: + if (entrySnapshotId == snapshotId) { + return new BaseAddedRowsScanTask( + changeOrdinal, + snapshotId, + dataFile, + addedDeleteFiles, + context.schemaAsString(), + context.specAsString(), + context.residuals()); + } else { + // the data file is added before the snapshot we're processing + if (addedDeleteFiles.length == 0) { + return DummyChangelogScanTask.INSTANCE; + } else { + return new BaseDeletedRowsScanTask( + changeOrdinal, + snapshotId, + dataFile, + addedDeleteFiles, + filterExisting(context.deletes().forEntry(entry)), + context.schemaAsString(), + context.specAsString(), + context.residuals()); + } + } + + case DELETED: + if (entrySnapshotId == snapshotId) { + return new BaseDeletedDataFileScanTask( + changeOrdinal, + snapshotId, + dataFile, + filterExisting(context.deletes().forEntry(entry)), + context.schemaAsString(), + context.specAsString(), + context.residuals()); + } else { + return DummyChangelogScanTask.INSTANCE; + } + + case EXISTING: + if (addedDeleteFiles.length == 0) { + return DummyChangelogScanTask.INSTANCE; + } else { + return new BaseDeletedRowsScanTask( + changeOrdinal, + snapshotId, + dataFile, + addedDeleteFiles, + filterExisting(context.deletes().forEntry(entry)), + context.schemaAsString(), + context.specAsString(), + context.residuals()); + } + + default: + throw new IllegalArgumentException( + "Unexpected entry status: " + entry.status()); + } + }); + return CloseableIterable.filter(tasks, task -> !(task instanceof DummyChangelogScanTask)); Review Comment: Would `task != DummyChangelogScanTask.INSTANCE` work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org