wypoon commented on code in PR #6026:
URL: https://github.com/apache/iceberg/pull/6026#discussion_r1003542296
##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -508,6 +530,75 @@ public void testIsDeletedColumnWithoutDeleteFile() {
checkDeleteCount(0L);
}
+ @Test
+ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws
IOException {
+ Assume.assumeTrue(format.equals("parquet"));
+
+ String tblName = "test3";
+ Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned());
+
+ List<Path> fileSplits = Lists.newArrayList();
+ StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+ Configuration conf = new Configuration();
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+ Path testFilePath = new Path(testFile.getAbsolutePath());
+
+ // Write a Parquet file with more than one row group
+ ParquetFileWriter parquetFileWriter =
+ new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA,
"test3Schema"), testFilePath);
+ parquetFileWriter.start();
+ for (int i = 0; i < 2; i += 1) {
+ File split = temp.newFile();
+ Assert.assertTrue("Delete should succeed", split.delete());
+ Path splitPath = new Path(split.getAbsolutePath());
+ fileSplits.add(splitPath);
+ try (FileAppender<InternalRow> writer =
+ Parquet.write(Files.localOutput(split))
+ .createWriterFunc(msgType ->
SparkParquetWriters.buildWriter(sparkSchema, msgType))
+ .schema(SCHEMA)
+ .overwrite()
+ .build()) {
+ Iterable<InternalRow> records = RandomData.generateSpark(SCHEMA, 100,
34 * i + 37);
+ writer.addAll(records);
+ }
+ parquetFileWriter.appendFile(
+ org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(splitPath,
conf));
+ }
+ parquetFileWriter.end(
+ ParquetFileWriter.mergeMetadataFiles(fileSplits, conf)
+ .getFileMetaData()
+ .getKeyValueMetaData());
+
+ // Add the file to the table
+ DataFile dataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+
.withInputFile(org.apache.iceberg.hadoop.HadoopInputFile.fromPath(testFilePath,
conf))
+ .withFormat("parquet")
+ .withRecordCount(200)
+ .build();
+ tbl.newAppend().appendFile(dataFile).commit();
+
+ // Add positional deletes to the table
+ List<Pair<CharSequence, Long>> deletes =
+ Lists.newArrayList(
+ Pair.of(dataFile.path(), 97L),
+ Pair.of(dataFile.path(), 98L),
+ Pair.of(dataFile.path(), 99L),
+ Pair.of(dataFile.path(), 101L),
+ Pair.of(dataFile.path(), 103L),
+ Pair.of(dataFile.path(), 107L),
+ Pair.of(dataFile.path(), 109L));
+ Pair<DeleteFile, CharSequenceSet> posDeletes =
+ FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()),
deletes);
+ tbl.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ Assert.assertEquals(193, rowSet(tblName, tbl, "*").size());
Review Comment:
Without the fix, this assertion fails for the vectorized case.
There are 3 deletes applied to the first row group and 4 deletes applied to
the second row group. Without the fix, the 3 deletes for the first row group
are applied to the second as well. Thus 6 rows are deleted and the result is
194 rows, instead of the expected 193.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]