wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1757706938
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,51 +131,149 @@ private static Map<Long, Integer>
computeSnapshotOrdinals(Deque<Snapshot> snapsh
return snapshotOrdinals;
}
- private static class CreateDataFileChangeTasks implements
CreateTasksFunction<ChangelogScanTask> {
- private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+ private Map<String, Integer> computeDeleteFileToSnapshotOrdinal(
+ Deque<Snapshot> snapshots, Map<Long, Integer> snapshotOrdinals) {
+ Map<String, Integer> deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+ for (Snapshot snapshot : snapshots) {
+ Iterable<DeleteFile> deleteFiles =
snapshot.addedDeleteFiles(table().io());
+ for (DeleteFile deleteFile : deleteFiles) {
+ deleteFileToSnapshotOrdinal.put(
+ deleteFile.path().toString(),
snapshotOrdinals.get(snapshot.snapshotId()));
+ }
+ }
+
+ return deleteFileToSnapshotOrdinal;
+ }
+
+ private static class DummyChangelogScanTask implements ChangelogScanTask {
+ public static final DummyChangelogScanTask INSTANCE = new
DummyChangelogScanTask();
+ private DummyChangelogScanTask() {}
+
+ @Override
+ public ChangelogOperation operation() {
+ return ChangelogOperation.DELETE;
+ }
+
+ @Override
+ public int changeOrdinal() {
+ return 0;
+ }
+
+ @Override
+ public long commitSnapshotId() {
+ return 0L;
+ }
+ }
+
+ private static class CreateDataFileChangeTasks implements
CreateTasksFunction<ChangelogScanTask> {
+ private final long snapshotId;
+ private final int changeOrdinal;
private final Map<Long, Integer> snapshotOrdinals;
+ private final Map<String, Integer> deleteFileToSnapshotOrdinal;
+
+ CreateDataFileChangeTasks(
+ long snapshotId,
+ Map<Long, Integer> snapshotOrdinals,
+ Map<String, Integer> deleteFileToSnapshotOrdinal) {
+ this.snapshotId = snapshotId;
+ this.snapshotOrdinals = snapshotOrdinals;
+ this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+ this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+ }
+
+ private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+ return FluentIterable.from(deleteFiles)
+ .filter(
+ deleteFile ->
+
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+ .toArray(DeleteFile.class);
+ }
- CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
- this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+ private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+ return FluentIterable.from(deleteFiles)
+ .filter(
+ deleteFile ->
+
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
Review Comment:
I'm not sure I understand the suggestion. Perhaps you can elaborate?
I thought the code here is quite clear and the intention is apparent from
the method name. Thus `filterAdded` filters for delete files whose snapshot
ordinal is the same as (`==`) `changeOrdinal`, while `filterExisting` filters
for delete files whose snapshot ordinal comes before (`<`) `changeOrdinal`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]