wypoon commented on code in PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#discussion_r1726394464


##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java:
##########
@@ -191,39 +214,361 @@ public void testMixDeleteAndInsert() throws IOException {
     table.newAppend().appendFile(dataFile2).commit();
     long snapshotId3 = table.currentSnapshot().snapshotId();
 
-    CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups = 
newScan().planTasks();
+    List<InternalRow> rows = getChangelogRows(table);
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+    addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, 
records1);
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, 
records2);
+
+    assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testPositionDeletes() throws IOException {
+    table.newAppend().appendFile(dataFile1).commit();
+    long snapshotId1 = table.currentSnapshot().snapshotId();
+
+    table.newAppend().appendFile(dataFile2).commit();
+    long snapshotId2 = table.currentSnapshot().snapshotId();
+
+    List<Pair<CharSequence, Long>> deletes =
+        Lists.newArrayList(
+            Pair.of(dataFile1.path(), 0L), // id = 29
+            Pair.of(dataFile1.path(), 3L), // id = 89
+            Pair.of(dataFile2.path(), 2L) // id = 122
+            );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes =
+        FileHelpers.writeDeleteFile(
+            table,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            deletes);
+
+    table
+        .newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+    long snapshotId3 = table.currentSnapshot().snapshotId();
+
+    List<InternalRow> rows = getChangelogRows(table);
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+    addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+    assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testEqualityDeletes() throws IOException {
+    table.newAppend().appendFile(dataFile1).commit();
+    long snapshotId1 = table.currentSnapshot().snapshotId();
+
+    table.newAppend().appendFile(dataFile2).commit();
+    long snapshotId2 = table.currentSnapshot().snapshotId();
+
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes =
+        Lists.newArrayList(
+            dataDelete.copy("data", "a"), // id = 29
+            dataDelete.copy("data", "d"), // id = 89
+            dataDelete.copy("data", "g") // id = 122
+            );
+
+    DeleteFile eqDeletes =
+        FileHelpers.writeDeleteFile(
+            table,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            dataDeletes,
+            deleteRowSchema);
+
+    table.newRowDelta().addDeletes(eqDeletes).commit();
+    long snapshotId3 = table.currentSnapshot().snapshotId();
+
+    List<InternalRow> rows = getChangelogRows(table);
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+    addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+    assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testMixOfPositionAndEqualityDeletes() throws IOException {
+    table.newAppend().appendFile(dataFile1).commit();
+    long snapshotId1 = table.currentSnapshot().snapshotId();
+
+    table.newAppend().appendFile(dataFile2).commit();
+    long snapshotId2 = table.currentSnapshot().snapshotId();
+
+    List<Pair<CharSequence, Long>> deletes =
+        Lists.newArrayList(
+            Pair.of(dataFile1.path(), 0L), // id = 29
+            Pair.of(dataFile1.path(), 3L) // id = 89
+            );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes =
+        FileHelpers.writeDeleteFile(
+            table,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            deletes);
+
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes =
+        Lists.newArrayList(
+            dataDelete.copy("data", "a"), // id = 29
+            dataDelete.copy("data", "g") // id = 122
+            );
+
+    DeleteFile eqDeletes =
+        FileHelpers.writeDeleteFile(
+            table,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            dataDeletes,
+            deleteRowSchema);
+
+    table
+        .newRowDelta()
+        .addDeletes(eqDeletes)
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+    long snapshotId3 = table.currentSnapshot().snapshotId();
+
+    List<InternalRow> rows = getChangelogRows(table);
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+    addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
records3);
+
+    assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testAddingAndDeletingInSameCommit() throws IOException {
+    GenericRecord record = GenericRecord.create(table.schema());
+    List<Record> records1b = Lists.newArrayList();
+    records1b.add(record.copy("id", 28, "data", "a"));
+    records1b.add(record.copy("id", 29, "data", "a"));
+    records1b.add(record.copy("id", 43, "data", "b"));
+    records1b.add(record.copy("id", 44, "data", "b"));
+    records1b.add(record.copy("id", 61, "data", "c"));
+    records1b.add(record.copy("id", 89, "data", "d"));
+    DataFile dataFile1b = writeDataFile(records1b);
+
+    List<Pair<CharSequence, Long>> deletes =
+        Lists.newArrayList(
+            Pair.of(dataFile1b.path(), 0L), // id = 28
+            Pair.of(dataFile1b.path(), 3L) // id = 44
+            );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes =
+        FileHelpers.writeDeleteFile(
+            table,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            deletes);
+
+    table
+        .newRowDelta()
+        .addRows(dataFile1b)
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+    // the resulting records in the table are the same as records1
+    long snapshotId1 = table.currentSnapshot().snapshotId();
+
+    table.newAppend().appendFile(dataFile2).commit();
+    long snapshotId2 = table.currentSnapshot().snapshotId();
+
+    List<InternalRow> rows = getChangelogRows(table);
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
records1);
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
records2);
+
+    assertEquals("Should have expected rows", expectedRows, 
internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testFlinkScenario1() throws IOException {
+    List<Record> rl1 = Lists.newArrayList();
+    GenericRecord record = GenericRecord.create(table2.schema());
+    rl1.add(record.copy("id", 1, "data", "a"));
+    rl1.add(record.copy("id", 2, "data", "b"));
+    rl1.add(record.copy("id", 3, "data", "c"));
+    DataFile df1 = writeDataFile(rl1);
+    table2.newAppend().appendFile(df1).commit();
+    long snapshotId1 = table2.currentSnapshot().snapshotId();
+
+    // update row with id=2 by deleting it using an equality delete and 
writing a new row with id=2
+    Schema deleteRowSchema = table2.schema().select("id");
+    Record idDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> idDeletes = Lists.newArrayList(idDelete.copy("id", 2));
+
+    DeleteFile eqDeletes =
+        FileHelpers.writeDeleteFile(
+            table2,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            idDeletes,
+            deleteRowSchema);
+
+    List<Record> rl2 = Lists.newArrayList(record.copy("id", 2, "data", "bb"));
+    DataFile df2 = writeDataFile(rl2);
+
+    table2.newRowDelta().addDeletes(eqDeletes).addRows(df2).commit();
+    long snapshotId2 = table2.currentSnapshot().snapshotId();
+
+    // update row with id=2 again the same way
+    List<Record> idDeletes2 = Lists.newArrayList(idDelete.copy("id", 2));
+
+    DeleteFile eqDeletes2 =
+        FileHelpers.writeDeleteFile(
+            table2,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            idDeletes2,
+            deleteRowSchema);
+
+    List<Record> rl3 = Lists.newArrayList(record.copy("id", 2, "data", "bbb"));
+    DataFile df3 = writeDataFile(rl3);
+
+    table2.newRowDelta().addDeletes(eqDeletes2).addRows(df3).commit();
+    long snapshotId3 = table2.currentSnapshot().snapshotId();
+
+    List<InternalRow> rows = getChangelogRows(table2);
+
+    List<Record> deleted1 = Lists.newArrayList(record.copy("id", 2, "data", 
"b"));
+    List<Record> deleted2 = Lists.newArrayList(record.copy("id", 2, "data", 
"bb"));
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, 
rl1);
+    addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, 
deleted1);
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, 
rl2);
+    // in snapshot 3, eqDeletes2 applies to both df1 and df2, so there are two 
deletes emitted
+    addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, 
deleted1);

Review Comment:
   I have fixed the expected results to remove this row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to