dramaticlly commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1765491165
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -132,6 +131,175 @@ public void testFileDeletes() {
assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
}
+ @TestTemplate
+ public void testRowDeletes() {
+ assumeThat(formatVersion).isEqualTo(2);
+
+ table
+ .newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_A2)
+ .appendFile(FILE_B)
+ .appendFile(FILE_C)
+ .commit();
+ Snapshot snap1 = table.currentSnapshot();
+
+ // position delete
+ table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+ Snapshot snap2 = table.currentSnapshot();
+
+ // equality delete
+ table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+ Snapshot snap3 = table.currentSnapshot();
+
+ // mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+ Snapshot snap4 = table.currentSnapshot();
+
+ IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+ List<ChangelogScanTask> tasks = plan(scan);
+
+ assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+ DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+ assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+ assertThat(t1.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap2.snapshotId());
+ assertThat(t1.file().path()).as("Data file must
match").isEqualTo(FILE_B.path());
+ assertThat(t1.addedDeletes().get(0).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_B_DELETES.path());
+ assertThat(t1.existingDeletes()).as("Must be no existing
deletes").isEmpty();
+
+ DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+ assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+ assertThat(t2.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap3.snapshotId());
+ assertThat(t2.file().path()).as("Data file must
match").isEqualTo(FILE_C.path());
+ assertThat(t2.addedDeletes().get(0).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_C2_DELETES.path());
+ assertThat(t2.existingDeletes()).as("Must be no existing
deletes").isEmpty();
+
+ DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+ assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+ assertThat(t3.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap4.snapshotId());
+ assertThat(t3.file().path()).as("Data file must
match").isEqualTo(FILE_A2.path());
+ assertThat(t3.addedDeletes().size()).as("Number of added delete files must
match").isEqualTo(2);
+ assertThat(t3.addedDeletes().get(0).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_A2_DELETES.path());
+ assertThat(t3.addedDeletes().get(1).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_A_DELETES.path());
+ assertThat(t3.existingDeletes()).as("Must be no existing
deletes").isEmpty();
+
+ DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3);
+ assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+ assertThat(t4.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap4.snapshotId());
+ assertThat(t4.file().path()).as("Data file must
match").isEqualTo(FILE_A.path());
+ assertThat(t4.addedDeletes().size()).as("Number of added delete files must
match").isEqualTo(2);
+ assertThat(t4.addedDeletes().get(0).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_A2_DELETES.path());
+ assertThat(t4.addedDeletes().get(1).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_A_DELETES.path());
+ assertThat(t4.existingDeletes()).as("Must be no existing
deletes").isEmpty();
Review Comment:
I believe this is the core of the test I am looking for! I think there's
some feature we can leverage in assertJ so I took the stub to organize this in
a little different way.
I do notice that the we use the file path instead of object itself for
asserting equality on data and delete files, probably because once added to
the table, its data and file sequence number are inherited from manifest entry.
I think we can probably reuse this `FILE_COMPARISON_CONFIG` from another class
[`TestManifestReader`](https://github.com/apache/iceberg/blob/5ea78e3fbe5d8c9c846a1b86bcd2b77d13a31acd/core/src/test/java/org/apache/iceberg/TestManifestReader.java#L40)
```java
private static final RecursiveComparisonConfiguration
FILE_COMPARISON_CONFIG =
RecursiveComparisonConfiguration.builder()
.withIgnoredFields(
"dataSequenceNumber", "fileOrdinal", "fileSequenceNumber",
"fromProjectionPos")
.build();
```
complete code
- group assertion around changelogScanTask
- ensure all are actually instance of DeletedRowsScanTask and assert on its
data and existing/added delete files
- leverage FILE_COMPARISON_CONFIG for asserting equality
```java
assertThat(tasks)
.extracting(ChangelogScanTask::changeOrdinal)
.as("Ordinal must match")
.containsExactly(0, 1, 2, 2);
assertThat(tasks)
.extracting(ChangelogScanTask::commitSnapshotId)
.as("Snapshot must match")
.containsExactly(
snap2.snapshotId(), snap3.snapshotId(), snap4.snapshotId(),
snap4.snapshotId());
assertThat(tasks)
.extracting(ChangelogScanTask::operation)
.as("Operation must match")
.containsExactly(
ChangelogOperation.DELETE,
ChangelogOperation.DELETE,
ChangelogOperation.DELETE,
ChangelogOperation.DELETE);
List<DeletedRowsScanTask> deletedRowsScanTasks =
tasks.stream()
.filter(t -> t instanceof DeletedRowsScanTask)
.map(t -> (DeletedRowsScanTask) t)
.collect(Collectors.toList());
assertThat(deletedRowsScanTasks).as("Must have 4
DeletedRowsScanTask").hasSize(4);
assertThat(deletedRowsScanTasks)
.extracting(DeletedRowsScanTask::existingDeletes)
.as("Existing deletes shall be empty")
.allMatch(List::isEmpty);
DeletedRowsScanTask t1 = deletedRowsScanTasks.get(0);
assertThat(t1.file())
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Data file must match")
.isEqualTo(FILE_B);
assertThat(t1.addedDeletes().get(0))
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Added delete file must match")
.isEqualTo(FILE_B_DELETES);
DeletedRowsScanTask t2 = deletedRowsScanTasks.get(1);
assertThat(t2.file())
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Data file must match")
.isEqualTo(FILE_C);
assertThat(t2.addedDeletes().get(0))
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Added delete file must match")
.isEqualTo(FILE_C2_DELETES);
DeletedRowsScanTask t3 = deletedRowsScanTasks.get(2);
assertThat(t3.file())
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Data file must match")
.isEqualTo(FILE_A2);
assertThat(t3.addedDeletes()).as("Number of added delete files must
match").hasSize(2);
assertThat(t3.addedDeletes().get(0))
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES);
assertThat(t3.addedDeletes().get(1))
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES);
DeletedRowsScanTask t4 = deletedRowsScanTasks.get(3);
assertThat(t4.file())
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Data file must match")
.isEqualTo(FILE_A);
assertThat(t4.addedDeletes()).as("Number of added delete files must
match").hasSize(2);
assertThat(t4.addedDeletes().get(0))
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Added delete file must match")
.isEqualTo(FILE_A2_DELETES);
assertThat(t4.addedDeletes().get(1))
.usingRecursiveComparison(FILE_COMPARISON_CONFIG)
.as("Added delete file must match")
.isEqualTo(FILE_A_DELETES);
```
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -132,6 +131,175 @@ public void testFileDeletes() {
assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
}
+ @TestTemplate
+ public void testRowDeletes() {
+ assumeThat(formatVersion).isEqualTo(2);
+
+ table
+ .newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_A2)
+ .appendFile(FILE_B)
+ .appendFile(FILE_C)
+ .commit();
+ Snapshot snap1 = table.currentSnapshot();
+
+ // position delete
+ table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
+ Snapshot snap2 = table.currentSnapshot();
+
+ // equality delete
+ table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
+ Snapshot snap3 = table.currentSnapshot();
+
+ // mix of position and equality deletes
+
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
+ Snapshot snap4 = table.currentSnapshot();
+
+ IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
+
+ List<ChangelogScanTask> tasks = plan(scan);
+
+ assertThat(tasks).as("Must have 4 tasks").hasSize(4);
+
+ DeletedRowsScanTask t1 = (DeletedRowsScanTask) tasks.get(0);
+ assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+ assertThat(t1.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap2.snapshotId());
+ assertThat(t1.file().path()).as("Data file must
match").isEqualTo(FILE_B.path());
+ assertThat(t1.addedDeletes().get(0).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_B_DELETES.path());
+ assertThat(t1.existingDeletes()).as("Must be no existing
deletes").isEmpty();
+
+ DeletedRowsScanTask t2 = (DeletedRowsScanTask) tasks.get(1);
+ assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
+ assertThat(t2.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap3.snapshotId());
+ assertThat(t2.file().path()).as("Data file must
match").isEqualTo(FILE_C.path());
+ assertThat(t2.addedDeletes().get(0).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_C2_DELETES.path());
+ assertThat(t2.existingDeletes()).as("Must be no existing
deletes").isEmpty();
+
+ DeletedRowsScanTask t3 = (DeletedRowsScanTask) tasks.get(2);
+ assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+ assertThat(t3.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap4.snapshotId());
+ assertThat(t3.file().path()).as("Data file must
match").isEqualTo(FILE_A2.path());
+ assertThat(t3.addedDeletes().size()).as("Number of added delete files must
match").isEqualTo(2);
+ assertThat(t3.addedDeletes().get(0).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_A2_DELETES.path());
+ assertThat(t3.addedDeletes().get(1).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_A_DELETES.path());
+ assertThat(t3.existingDeletes()).as("Must be no existing
deletes").isEmpty();
+
+ DeletedRowsScanTask t4 = (DeletedRowsScanTask) tasks.get(3);
+ assertThat(t4.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
+ assertThat(t4.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap4.snapshotId());
+ assertThat(t4.file().path()).as("Data file must
match").isEqualTo(FILE_A.path());
+ assertThat(t4.addedDeletes().size()).as("Number of added delete files must
match").isEqualTo(2);
+ assertThat(t4.addedDeletes().get(0).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_A2_DELETES.path());
+ assertThat(t4.addedDeletes().get(1).path())
+ .as("Added delete file must match")
+ .isEqualTo(FILE_A_DELETES.path());
+ assertThat(t4.existingDeletes()).as("Must be no existing
deletes").isEmpty();
+ }
+
+ @TestTemplate
+ public void testAddingAndDeletingInSameCommit() {
+ assumeThat(formatVersion).isEqualTo(2);
+
+ table.newFastAppend().appendFile(FILE_A).commit();
+ Snapshot snap1 = table.currentSnapshot();
+
+ table.newRowDelta().addRows(FILE_B).addDeletes(FILE_B_DELETES).commit();
+ Snapshot snap2 = table.currentSnapshot();
+
+ IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+ List<ChangelogScanTask> tasks = plan(scan);
+
+ assertThat(tasks).as("Must have 1 tasks").hasSize(1);
+
+ AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+ assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
+ assertThat(t1.commitSnapshotId()).as("Snapshot must
match").isEqualTo(snap2.snapshotId());
+ assertThat(t1.file().path()).as("Data file must
match").isEqualTo(FILE_B.path());
+ assertThat(t1.deletes().get(0).path())
+ .as("Delete file must match")
+ .isEqualTo(FILE_B_DELETES.path());
+ }
+
+ @TestTemplate
+ public void testDeletingRowsInDataFileWithExistingDeletes() {
Review Comment:
love this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]