dramaticlly commented on code in PR #10935: URL: https://github.com/apache/iceberg/pull/10935#discussion_r1755259576
########## core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java: ########## @@ -63,33 +60,43 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles( return CloseableIterable.empty(); } - Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots); + Map<Long, Integer> snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); - Set<ManifestFile> newDataManifests = - FluentIterable.from(changelogSnapshots) - .transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) - .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) - .toSet(); - - ManifestGroup manifestGroup = - new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .select(scanColumns()) - .filterData(filter()) - .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) - .ignoreExisting() - .columnsToKeepStats(columnsToKeepStats()); - - if (shouldIgnoreResiduals()) { - manifestGroup = manifestGroup.ignoreResiduals(); - } - - if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { - manifestGroup = manifestGroup.planWith(planExecutor()); - } + // map of delete file to the snapshot where the delete file is added + // the delete file is keyed by its path, and the snapshot is represented by the snapshot ordinal + Map<String, Integer> deleteFileToSnapshotOrdinal = + computeDeleteFileToSnapshotOrdinal(changelogSnapshots, snapshotOrdinals); Review Comment: yeah I agree the assessment ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java: ########## @@ -191,39 +214,359 @@ public void testMixDeleteAndInsert() throws IOException { table.newAppend().appendFile(dataFile2).commit(); long snapshotId3 = table.currentSnapshot().snapshotId(); - CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups = newScan().planTasks(); + List<InternalRow> rows = getChangelogRows(table); + + List<Object[]> expectedRows = Lists.newArrayList(); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); + addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2); + + assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testPositionDeletes() throws IOException { + table.newAppend().appendFile(dataFile1).commit(); + long snapshotId1 = table.currentSnapshot().snapshotId(); + + table.newAppend().appendFile(dataFile2).commit(); + long snapshotId2 = table.currentSnapshot().snapshotId(); + + List<Pair<CharSequence, Long>> deletes = + Lists.newArrayList( + Pair.of(dataFile1.path(), 0L), // id = 29 + Pair.of(dataFile1.path(), 3L), // id = 89 + Pair.of(dataFile2.path(), 2L) // id = 122 + ); + + Pair<DeleteFile, CharSequenceSet> posDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); + + table + .newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + long snapshotId3 = table.currentSnapshot().snapshotId(); + + List<InternalRow> rows = getChangelogRows(table); + + List<Object[]> expectedRows = Lists.newArrayList(); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); + addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + + assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testEqualityDeletes() throws IOException { + table.newAppend().appendFile(dataFile1).commit(); + long snapshotId1 = table.currentSnapshot().snapshotId(); + + table.newAppend().appendFile(dataFile2).commit(); + long snapshotId2 = table.currentSnapshot().snapshotId(); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List<Record> dataDeletes = + Lists.newArrayList( + dataDelete.copy("data", "a"), // id = 29 + dataDelete.copy("data", "d"), // id = 89 + dataDelete.copy("data", "g") // id = 122 + ); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema); + + table.newRowDelta().addDeletes(eqDeletes).commit(); + long snapshotId3 = table.currentSnapshot().snapshotId(); + + List<InternalRow> rows = getChangelogRows(table); + + List<Object[]> expectedRows = Lists.newArrayList(); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); + addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + + assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testMixOfPositionAndEqualityDeletes() throws IOException { + table.newAppend().appendFile(dataFile1).commit(); + long snapshotId1 = table.currentSnapshot().snapshotId(); + + table.newAppend().appendFile(dataFile2).commit(); + long snapshotId2 = table.currentSnapshot().snapshotId(); + + List<Pair<CharSequence, Long>> deletes = + Lists.newArrayList( + Pair.of(dataFile1.path(), 0L), // id = 29 + Pair.of(dataFile1.path(), 3L) // id = 89 + ); + + Pair<DeleteFile, CharSequenceSet> posDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List<Record> dataDeletes = + Lists.newArrayList( + dataDelete.copy("data", "a"), // id = 29 + dataDelete.copy("data", "g") // id = 122 + ); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema); + + table + .newRowDelta() + .addDeletes(eqDeletes) + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + long snapshotId3 = table.currentSnapshot().snapshotId(); + + List<InternalRow> rows = getChangelogRows(table); + + List<Object[]> expectedRows = Lists.newArrayList(); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); + addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3); + + assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testAddingAndDeletingInSameCommit() throws IOException { + GenericRecord record = GenericRecord.create(table.schema()); + List<Record> records1b = Lists.newArrayList(); + records1b.add(record.copy("id", 28, "data", "a")); + records1b.add(record.copy("id", 29, "data", "a")); + records1b.add(record.copy("id", 43, "data", "b")); + records1b.add(record.copy("id", 44, "data", "b")); + records1b.add(record.copy("id", 61, "data", "c")); + records1b.add(record.copy("id", 89, "data", "d")); + DataFile dataFile1b = writeDataFile(records1b); + + List<Pair<CharSequence, Long>> deletes = + Lists.newArrayList( + Pair.of(dataFile1b.path(), 0L), // id = 28 + Pair.of(dataFile1b.path(), 3L) // id = 44 + ); + + Pair<DeleteFile, CharSequenceSet> posDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); + + table + .newRowDelta() + .addRows(dataFile1b) + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + // the resulting records in the table are the same as records1 + long snapshotId1 = table.currentSnapshot().snapshotId(); + + table.newAppend().appendFile(dataFile2).commit(); + long snapshotId2 = table.currentSnapshot().snapshotId(); + + List<InternalRow> rows = getChangelogRows(table); + + List<Object[]> expectedRows = Lists.newArrayList(); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2); + + assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testRepeatedEqualityDelete() throws IOException { + List<Record> rl1 = Lists.newArrayList(); + GenericRecord record = GenericRecord.create(table2.schema()); + rl1.add(record.copy("id", 1, "data", "a")); + rl1.add(record.copy("id", 2, "data", "b")); + rl1.add(record.copy("id", 3, "data", "c")); + DataFile df1 = writeDataFile(rl1); + table2.newAppend().appendFile(df1).commit(); + long snapshotId1 = table2.currentSnapshot().snapshotId(); + + // update row with id=2 by deleting it using an equality delete and writing a new row with id=2 + Schema deleteRowSchema = table2.schema().select("id"); + Record idDelete = GenericRecord.create(deleteRowSchema); + List<Record> idDeletes = Lists.newArrayList(idDelete.copy("id", 2)); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table2, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + idDeletes, + deleteRowSchema); + + List<Record> rl2 = Lists.newArrayList(record.copy("id", 2, "data", "bb")); + DataFile df2 = writeDataFile(rl2); + + table2.newRowDelta().addDeletes(eqDeletes).addRows(df2).commit(); + long snapshotId2 = table2.currentSnapshot().snapshotId(); + + // update row with id=2 again the same way + List<Record> idDeletes2 = Lists.newArrayList(idDelete.copy("id", 2)); + + DeleteFile eqDeletes2 = + FileHelpers.writeDeleteFile( + table2, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + idDeletes2, + deleteRowSchema); + + List<Record> rl3 = Lists.newArrayList(record.copy("id", 2, "data", "bbb")); + DataFile df3 = writeDataFile(rl3); + + table2.newRowDelta().addDeletes(eqDeletes2).addRows(df3).commit(); + long snapshotId3 = table2.currentSnapshot().snapshotId(); + + List<InternalRow> rows = getChangelogRows(table2); + + List<Record> deleted1 = Lists.newArrayList(record.copy("id", 2, "data", "b")); + List<Record> deleted2 = Lists.newArrayList(record.copy("id", 2, "data", "bb")); + + List<Object[]> expectedRows = Lists.newArrayList(); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, rl1); + addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, deleted1); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, rl2); + addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, deleted2); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, rl3); + + assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + @Test + public void testRepeatedUpdateInSameCommit() throws IOException { + List<Record> rl1 = Lists.newArrayList(); + GenericRecord record = GenericRecord.create(table2.schema()); + rl1.add(record.copy("id", 1, "data", "a")); + rl1.add(record.copy("id", 2, "data", "b")); + rl1.add(record.copy("id", 3, "data", "c")); + DataFile df1 = writeDataFile(rl1); + table2.newAppend().appendFile(df1).commit(); + long snapshotId1 = table2.currentSnapshot().snapshotId(); + + // delete row with id=2 with an equality delete and write a new row for it, but then delete the + // new row with a positional delete and write yet another new row for it + Schema deleteRowSchema = table2.schema().select("id"); + Record idDelete = GenericRecord.create(deleteRowSchema); + List<Record> idDeletes = Lists.newArrayList(idDelete.copy("id", 2)); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table2, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + idDeletes, + deleteRowSchema); + + List<Record> rl2 = Lists.newArrayList(); + rl2.add(record.copy("id", 2, "data", "bb")); + rl2.add(record.copy("id", 4, "data", "d")); + rl2.add(record.copy("id", 5, "data", "e")); + rl2.add(record.copy("id", 2, "data", "bbb")); + DataFile df2 = writeDataFile(rl2); + + List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(Pair.of(df2.path(), 0L)); + + Pair<DeleteFile, CharSequenceSet> posDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + deletes); + + table2 + .newRowDelta() + .addDeletes(eqDeletes) + .addRows(df2) + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + long snapshotId2 = table2.currentSnapshot().snapshotId(); + + List<InternalRow> rows = getChangelogRows(table2); + + // for snapshot 2, we should record only one delete for id=2 (from the equality delete) + // and one insert (the final value) + List<Record> deleted = Lists.newArrayList(); + deleted.add(record.copy("id", 2, "data", "b")); + List<Record> inserted = Lists.newArrayList(); + inserted.add(record.copy("id", 2, "data", "bbb")); + inserted.add(record.copy("id", 4, "data", "d")); + inserted.add(record.copy("id", 5, "data", "e")); + + List<Object[]> expectedRows = Lists.newArrayList(); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, rl1); + addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, deleted); + addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, inserted); + + assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows)); + } + + private IncrementalChangelogScan newScan() { + return table.newIncrementalChangelogScan(); + } + + private List<InternalRow> getChangelogRows(Table tbl) throws IOException { + CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups = + tbl.newIncrementalChangelogScan().planTasks(); List<InternalRow> rows = Lists.newArrayList(); for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) { ChangelogRowReader reader = - new ChangelogRowReader(table, taskGroup, table.schema(), table.schema(), false); + new ChangelogRowReader(tbl, taskGroup, tbl.schema(), tbl.schema(), false); while (reader.next()) { rows.add(reader.get().copy()); } reader.close(); } - // order by the change ordinal + // order by change ordinal, change type, data, id rows.sort( (r1, r2) -> { if (r1.getInt(3) != r2.getInt(3)) { return r1.getInt(3) - r2.getInt(3); + } else if (!r1.getUTF8String(2).equals(r2.getUTF8String(2))) { + return r1.getUTF8String(2).compareTo(r2.getUTF8String(2)); + } else if (!r1.getUTF8String(1).equals(r2.getUTF8String(1))) { + return r1.getUTF8String(1).compareTo(r2.getUTF8String(1)); } else { return r1.getInt(0) - r2.getInt(0); } }); Review Comment: this can probably simplified to ```java rows.sort( Comparator.comparingInt((InternalRow r) -> r.getInt(3)) .thenComparingInt(r -> r.getInt(0)) .thenComparing(r -> r.getUTF8String(2)) .thenComparing(r -> r.getUTF8String(1)) .thenComparingInt(r -> r.getInt(0))); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org