dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1733661339


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,51 +131,149 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
-    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map<String, Integer> computeDeleteFileToSnapshotOrdinal(
+      Deque<Snapshot> snapshots, Map<Long, Integer> snapshotOrdinals) {
+    Map<String, Integer> deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+    for (Snapshot snapshot : snapshots) {
+      Iterable<DeleteFile> deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+      for (DeleteFile deleteFile : deleteFiles) {
+        deleteFileToSnapshotOrdinal.put(
+            deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+      }
+    }
+
+    return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+    public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+    private DummyChangelogScanTask() {}
+
+    @Override
+    public ChangelogOperation operation() {
+      return ChangelogOperation.DELETE;
+    }
+
+    @Override
+    public int changeOrdinal() {
+      return 0;
+    }
+
+    @Override
+    public long commitSnapshotId() {
+      return 0L;
+    }
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
+    private final long snapshotId;
+    private final int changeOrdinal;
     private final Map<Long, Integer> snapshotOrdinals;
+    private final Map<String, Integer> deleteFileToSnapshotOrdinal;
+
+    CreateDataFileChangeTasks(
+        long snapshotId,
+        Map<Long, Integer> snapshotOrdinals,
+        Map<String, Integer> deleteFileToSnapshotOrdinal) {
+      this.snapshotId = snapshotId;
+      this.snapshotOrdinals = snapshotOrdinals;
+      this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+      this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+    }
+
+    private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+      return FluentIterable.from(deleteFiles)
+          .filter(
+              deleteFile ->
+                  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+          .toArray(DeleteFile.class);
+    }
 
-    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
-      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+      return FluentIterable.from(deleteFiles)
+          .filter(
+              deleteFile ->
+                  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+          .toArray(DeleteFile.class);
     }

Review Comment:
   Can we use a comparator here to highlight the difference between added 
deletes and existing deletes? I think you can compare it with the snapshot 
ordinal but it's a bit hard to see as the only difference is equality and less 
than. 



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,51 +131,149 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
-  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
-    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+  private Map<String, Integer> computeDeleteFileToSnapshotOrdinal(
+      Deque<Snapshot> snapshots, Map<Long, Integer> snapshotOrdinals) {
+    Map<String, Integer> deleteFileToSnapshotOrdinal = Maps.newHashMap();
+
+    for (Snapshot snapshot : snapshots) {
+      Iterable<DeleteFile> deleteFiles = 
snapshot.addedDeleteFiles(table().io());
+      for (DeleteFile deleteFile : deleteFiles) {
+        deleteFileToSnapshotOrdinal.put(
+            deleteFile.path().toString(), 
snapshotOrdinals.get(snapshot.snapshotId()));
+      }
+    }
+
+    return deleteFileToSnapshotOrdinal;
+  }
+
+  private static class DummyChangelogScanTask implements ChangelogScanTask {
+    public static final DummyChangelogScanTask INSTANCE = new 
DummyChangelogScanTask();
 
+    private DummyChangelogScanTask() {}
+
+    @Override
+    public ChangelogOperation operation() {
+      return ChangelogOperation.DELETE;
+    }
+
+    @Override
+    public int changeOrdinal() {
+      return 0;
+    }
+
+    @Override
+    public long commitSnapshotId() {
+      return 0L;
+    }
+  }
+
+  private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
+    private final long snapshotId;
+    private final int changeOrdinal;
     private final Map<Long, Integer> snapshotOrdinals;
+    private final Map<String, Integer> deleteFileToSnapshotOrdinal;
+
+    CreateDataFileChangeTasks(
+        long snapshotId,
+        Map<Long, Integer> snapshotOrdinals,
+        Map<String, Integer> deleteFileToSnapshotOrdinal) {
+      this.snapshotId = snapshotId;
+      this.snapshotOrdinals = snapshotOrdinals;
+      this.deleteFileToSnapshotOrdinal = deleteFileToSnapshotOrdinal;
+      this.changeOrdinal = this.snapshotOrdinals.get(snapshotId);
+    }
+
+    private DeleteFile[] filterAdded(DeleteFile[] deleteFiles) {
+      return FluentIterable.from(deleteFiles)
+          .filter(
+              deleteFile ->
+                  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) == changeOrdinal)
+          .toArray(DeleteFile.class);
+    }
 
-    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
-      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    private DeleteFile[] filterExisting(DeleteFile[] deleteFiles) {
+      return FluentIterable.from(deleteFiles)
+          .filter(
+              deleteFile ->
+                  
deleteFileToSnapshotOrdinal.get(deleteFile.path().toString()) < changeOrdinal)
+          .toArray(DeleteFile.class);
     }

Review Comment:
   Can we use a comparator here to highlight the difference between added 
deletes and existing deletes? I think you can compare it with the snapshot 
ordinal but it's a bit hard to see as the only difference is equality and less 
than. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to