This is an automated email from the ASF dual-hosted git repository. etudenhoefner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit a4816c1c99063770473920cd6d62f88f90a292dc Author: Eduard Tudenhoefner <[email protected]> AuthorDate: Fri Mar 21 07:42:19 2025 +0100 Spark 3.4: Include content offset/size in PositionDeletesTable --- .../apache/iceberg/spark/source/DVIterator.java | 5 ++ .../spark/source/TestPositionDeletesReader.java | 70 +++++++++++++++------- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java index 7b08b86cbf..0c319e2bd4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java @@ -30,6 +30,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ScanTaskUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.unsafe.types.UTF8String; @@ -79,6 +80,10 @@ class DVIterator implements CloseableIterator<InternalRow> { rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID)); } else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) { rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID)); + } else if (fieldId == MetadataColumns.CONTENT_OFFSET_COLUMN_ID) { + rowValues.add(deleteFile.contentOffset()); + } else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID) { + rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile)); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java index 5b876dfc57..c182413f39 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java @@ -157,13 +157,17 @@ public class TestPositionDeletesReader extends TestBase { Table positionDeletesTable = catalog.loadTable(TableIdentifier.of("default", "test", "position_deletes")); - Schema projectedSchema = - positionDeletesTable - .schema() - .select( - MetadataColumns.DELETE_FILE_PATH.name(), - MetadataColumns.DELETE_FILE_POS.name(), - PositionDeletesTable.DELETE_FILE_PATH); + List<String> columns = + Lists.newArrayList( + MetadataColumns.DELETE_FILE_PATH.name(), + MetadataColumns.DELETE_FILE_POS.name(), + PositionDeletesTable.DELETE_FILE_PATH); + if (formatVersion >= 3) { + columns.add(PositionDeletesTable.CONTENT_OFFSET); + columns.add(PositionDeletesTable.CONTENT_SIZE_IN_BYTES); + } + + Schema projectedSchema = positionDeletesTable.schema().select(columns); List<ScanTask> scanTasks = Lists.newArrayList( @@ -187,15 +191,27 @@ public class TestPositionDeletesReader extends TestBase { String dataFileLocation = formatVersion >= 3 ? deleteFile1.referencedDataFile() : dataFile1.location(); - Object[] first = { - UTF8String.fromString(dataFileLocation), 0L, UTF8String.fromString(deleteFile1.location()) - }; - Object[] second = { - UTF8String.fromString(dataFileLocation), 1L, UTF8String.fromString(deleteFile1.location()) - }; + List<Object> first = + Lists.newArrayList( + UTF8String.fromString(dataFileLocation), + 0L, + UTF8String.fromString(deleteFile1.location())); + List<Object> second = + Lists.newArrayList( + UTF8String.fromString(dataFileLocation), + 1L, + UTF8String.fromString(deleteFile1.location())); + + if (formatVersion >= 3) { + first.add(deleteFile1.contentOffset()); + first.add(deleteFile1.contentSizeInBytes()); + second.add(deleteFile1.contentOffset()); + second.add(deleteFile1.contentSizeInBytes()); + } + assertThat(internalRowsToJava(actualRows, projectedSchema)) .hasSize(2) - .containsExactly(first, second); + .containsExactly(first.toArray(), second.toArray()); } assertThat(scanTasks.get(1)).isInstanceOf(PositionDeletesScanTask.class); @@ -214,15 +230,27 @@ public class TestPositionDeletesReader extends TestBase { String dataFileLocation = formatVersion >= 3 ? deleteFile2.referencedDataFile() : dataFile2.location(); - Object[] first = { - UTF8String.fromString(dataFileLocation), 2L, UTF8String.fromString(deleteFile2.location()) - }; - Object[] second = { - UTF8String.fromString(dataFileLocation), 3L, UTF8String.fromString(deleteFile2.location()) - }; + List<Object> first = + Lists.newArrayList( + UTF8String.fromString(dataFileLocation), + 2L, + UTF8String.fromString(deleteFile2.location())); + List<Object> second = + Lists.newArrayList( + UTF8String.fromString(dataFileLocation), + 3L, + UTF8String.fromString(deleteFile2.location())); + + if (formatVersion >= 3) { + first.add(deleteFile2.contentOffset()); + first.add(deleteFile2.contentSizeInBytes()); + second.add(deleteFile2.contentOffset()); + second.add(deleteFile2.contentSizeInBytes()); + } + assertThat(internalRowsToJava(actualRows, projectedSchema)) .hasSize(2) - .containsExactly(first, second); + .containsExactly(first.toArray(), second.toArray()); } }
