This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b2f3a4ce7b Spark: Backport ORC vectorized reader to use the delete 
filter (#14794)
b2f3a4ce7b is described below

commit b2f3a4ce7bcb21b966cd3928ae964cc98b9cbdf5
Author: pvary <[email protected]>
AuthorDate: Tue Dec 9 13:31:27 2025 +0100

    Spark: Backport ORC vectorized reader to use the delete filter (#14794)
    
    backports #14746
---
 .../data/vectorized/VectorizedSparkOrcReaders.java |  4 +-
 .../org/apache/iceberg/spark/data/TestHelpers.java | 71 ++++++++++++++++++++
 .../data/TestSparkOrcReadMetadataColumns.java      | 54 ++++++++++++---
 .../data/TestSparkParquetReadMetadataColumns.java  | 78 ++--------------------
 .../data/vectorized/VectorizedSparkOrcReaders.java |  4 +-
 .../org/apache/iceberg/spark/data/TestHelpers.java | 71 ++++++++++++++++++++
 .../data/TestSparkOrcReadMetadataColumns.java      | 54 ++++++++++++---
 .../data/TestSparkParquetReadMetadataColumns.java  | 78 ++--------------------
 8 files changed, 250 insertions(+), 164 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
index 76e0ee3811..8dceb075e6 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
@@ -462,7 +462,9 @@ public class VectorizedSparkOrcReaders {
         } else if (field.equals(MetadataColumns.ROW_POSITION)) {
           fieldVectors.add(new RowPositionColumnVector(batchOffsetInFile));
         } else if (field.equals(MetadataColumns.IS_DELETED)) {
-          fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, 
false));
+          DeletedColumnVector deletedVector = new 
DeletedColumnVector(field.type());
+          deletedVector.setValue(new boolean[batchSize]);
+          fieldVectors.add(deletedVector);
         } else {
           fieldVectors.add(
               fieldConverters
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 120d6eeb17..72f9a36609 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -52,10 +52,16 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.DeleteCounter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.Type;
@@ -895,4 +901,69 @@ public class TestHelpers {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  public static class CustomizedDeleteFilter extends DeleteFilter<InternalRow> 
{
+    private final boolean hasDeletes;
+
+    protected CustomizedDeleteFilter(
+        boolean hasDeletes, Schema tableSchema, Schema projectedSchema) {
+      super("", List.of(), tableSchema, projectedSchema, new DeleteCounter(), 
true);
+      this.hasDeletes = hasDeletes;
+    }
+
+    @Override
+    protected StructLike asStructLike(InternalRow record) {
+      return null;
+    }
+
+    @Override
+    protected InputFile getInputFile(String location) {
+      return null;
+    }
+
+    @Override
+    public boolean hasPosDeletes() {
+      return hasDeletes;
+    }
+
+    @Override
+    public PositionDeleteIndex deletedRowPositions() {
+      PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
+      if (hasDeletes) {
+        deletedRowPos.delete(98, 103);
+      }
+
+      return deletedRowPos;
+    }
+  }
+
+  public static class CustomizedPositionDeleteIndex implements 
PositionDeleteIndex {
+    private final Set<Long> deleteIndex;
+
+    private CustomizedPositionDeleteIndex() {
+      deleteIndex = Sets.newHashSet();
+    }
+
+    @Override
+    public void delete(long position) {
+      deleteIndex.add(position);
+    }
+
+    @Override
+    public void delete(long posStart, long posEnd) {
+      for (long l = posStart; l < posEnd; l++) {
+        delete(l);
+      }
+    }
+
+    @Override
+    public boolean isDeleted(long position) {
+      return deleteIndex.contains(position);
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return deleteIndex.isEmpty();
+    }
+  }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
index 9d725250d3..13acaa1e3a 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -36,6 +37,7 @@ import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -46,6 +48,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.source.BatchReaderUtil;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.OrcConf;
@@ -74,7 +77,11 @@ public class TestSparkOrcReadMetadataColumns {
           MetadataColumns.ROW_POSITION,
           MetadataColumns.IS_DELETED);
 
+  private static final DeleteFilter<InternalRow> NO_DELETES_FILTER =
+      new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA, 
PROJECTION_SCHEMA);
+
   private static final int NUM_ROWS = 1000;
+  private static final int RECORDS_PER_BATCH = 10;
   private static final List<InternalRow> DATA_ROWS;
   private static final List<InternalRow> EXPECTED_ROWS;
 
@@ -128,13 +135,35 @@ public class TestSparkOrcReadMetadataColumns {
 
   @TestTemplate
   public void testReadRowNumbers() throws IOException {
-    readAndValidate(null, null, null, EXPECTED_ROWS);
+    readAndValidate(null, null, null, EXPECTED_ROWS, NO_DELETES_FILTER);
+  }
+
+  @TestTemplate
+  public void testReadRowNumbersWithDelete() throws IOException {
+    assumeThat(vectorized).isTrue();
+
+    List<InternalRow> expectedRowsAfterDelete = Lists.newArrayList();
+    EXPECTED_ROWS.forEach(row -> expectedRowsAfterDelete.add(row.copy()));
+    // remove row at position 98, 99, 100, 101, 102, this crosses two row 
groups [0, 100) and [100,
+    // 200)
+    for (int i = 98; i <= 102; i++) {
+      expectedRowsAfterDelete.get(i).update(3, true);
+    }
+
+    DeleteFilter<InternalRow> deleteFilter =
+        new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA, 
PROJECTION_SCHEMA);
+
+    readAndValidate(null, null, null, expectedRowsAfterDelete, deleteFilter);
   }
 
   @TestTemplate
   public void testReadRowNumbersWithFilter() throws IOException {
     readAndValidate(
-        Expressions.greaterThanOrEqual("id", 500), null, null, 
EXPECTED_ROWS.subList(500, 1000));
+        Expressions.greaterThanOrEqual("id", 500),
+        null,
+        null,
+        EXPECTED_ROWS.subList(500, 1000),
+        NO_DELETES_FILTER);
   }
 
   @TestTemplate
@@ -157,12 +186,17 @@ public class TestSparkOrcReadMetadataColumns {
           null,
           splitOffsets.get(i),
           splitLengths.get(i),
-          EXPECTED_ROWS.subList(i * 100, (i + 1) * 100));
+          EXPECTED_ROWS.subList(i * 100, (i + 1) * 100),
+          NO_DELETES_FILTER);
     }
   }
 
   private void readAndValidate(
-      Expression filter, Long splitStart, Long splitLength, List<InternalRow> 
expected)
+      Expression filter,
+      Long splitStart,
+      Long splitLength,
+      List<InternalRow> expected,
+      DeleteFilter<InternalRow> deleteFilter)
       throws IOException {
     Schema projectionWithoutMetadataFields =
         TypeUtil.selectNot(PROJECTION_SCHEMA, 
MetadataColumns.metadataFieldIds());
@@ -173,10 +207,12 @@ public class TestSparkOrcReadMetadataColumns {
 
       if (vectorized) {
         builder =
-            builder.createBatchedReaderFunc(
-                readOrcSchema ->
-                    VectorizedSparkOrcReaders.buildReader(
-                        PROJECTION_SCHEMA, readOrcSchema, ImmutableMap.of()));
+            builder
+                .recordsPerBatch(RECORDS_PER_BATCH)
+                .createBatchedReaderFunc(
+                    readOrcSchema ->
+                        VectorizedSparkOrcReaders.buildReader(
+                            PROJECTION_SCHEMA, readOrcSchema, 
ImmutableMap.of()));
       } else {
         builder =
             builder.createReaderFunc(
@@ -192,7 +228,7 @@ public class TestSparkOrcReadMetadataColumns {
       }
 
       if (vectorized) {
-        reader = batchesToRows(builder.build());
+        reader = 
batchesToRows(BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter));
       } else {
         reader = builder.build();
       }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
index ccd783915c..e2e5a98ccb 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
@@ -26,7 +26,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Files;
@@ -35,21 +34,16 @@ import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.deletes.DeleteCounter;
-import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.spark.source.BatchReaderUtil;
@@ -183,7 +177,8 @@ public class TestSparkParquetReadMetadataColumns {
     Parquet.ReadBuilder builder =
         Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
 
-    DeleteFilter<InternalRow> deleteFilter = new TestDeleteFilter(true);
+    DeleteFilter<InternalRow> deleteFilter =
+        new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA, 
PROJECTION_SCHEMA);
 
     builder.createBatchedReaderFunc(
         fileSchema ->
@@ -194,70 +189,6 @@ public class TestSparkParquetReadMetadataColumns {
     validate(expectedRowsAfterDelete, builder, deleteFilter);
   }
 
-  private static class TestDeleteFilter extends DeleteFilter<InternalRow> {
-    private final boolean hasDeletes;
-
-    protected TestDeleteFilter(boolean hasDeletes) {
-      super("", List.of(), DATA_SCHEMA, PROJECTION_SCHEMA, new 
DeleteCounter(), true);
-      this.hasDeletes = hasDeletes;
-    }
-
-    @Override
-    protected StructLike asStructLike(InternalRow record) {
-      return null;
-    }
-
-    @Override
-    protected InputFile getInputFile(String location) {
-      return null;
-    }
-
-    @Override
-    public boolean hasPosDeletes() {
-      return hasDeletes;
-    }
-
-    @Override
-    public PositionDeleteIndex deletedRowPositions() {
-      PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
-      if (hasDeletes) {
-        deletedRowPos.delete(98, 103);
-      }
-
-      return deletedRowPos;
-    }
-  }
-
-  private static class CustomizedPositionDeleteIndex implements 
PositionDeleteIndex {
-    private final Set<Long> deleteIndex;
-
-    private CustomizedPositionDeleteIndex() {
-      deleteIndex = Sets.newHashSet();
-    }
-
-    @Override
-    public void delete(long position) {
-      deleteIndex.add(position);
-    }
-
-    @Override
-    public void delete(long posStart, long posEnd) {
-      for (long l = posStart; l < posEnd; l++) {
-        delete(l);
-      }
-    }
-
-    @Override
-    public boolean isDeleted(long position) {
-      return deleteIndex.contains(position);
-    }
-
-    @Override
-    public boolean isEmpty() {
-      return deleteIndex.isEmpty();
-    }
-  }
-
   @TestTemplate
   public void testReadRowNumbersWithFilter() throws IOException {
     // current iceberg supports row group filter.
@@ -314,7 +245,10 @@ public class TestSparkParquetReadMetadataColumns {
       builder = builder.split(splitStart, splitLength);
     }
 
-    validate(expected, builder, new TestDeleteFilter(false));
+    validate(
+        expected,
+        builder,
+        new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA, 
PROJECTION_SCHEMA));
   }
 
   private void validate(
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
index 5f68c233f6..4f32423988 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
@@ -462,7 +462,9 @@ public class VectorizedSparkOrcReaders {
         } else if (field.equals(MetadataColumns.ROW_POSITION)) {
           fieldVectors.add(new RowPositionColumnVector(batchOffsetInFile));
         } else if (field.equals(MetadataColumns.IS_DELETED)) {
-          fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, 
false));
+          DeletedColumnVector deletedVector = new 
DeletedColumnVector(field.type());
+          deletedVector.setValue(new boolean[batchSize]);
+          fieldVectors.add(deletedVector);
         } else if (field.type().equals(Types.UnknownType.get())) {
           fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, 
null));
         } else {
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 82ebb1d950..dae8612f7d 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -52,10 +52,16 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.DeleteCounter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.Type;
@@ -876,4 +882,69 @@ public class TestHelpers {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  public static class CustomizedDeleteFilter extends DeleteFilter<InternalRow> 
{
+    private final boolean hasDeletes;
+
+    protected CustomizedDeleteFilter(
+        boolean hasDeletes, Schema tableSchema, Schema projectedSchema) {
+      super("", List.of(), tableSchema, projectedSchema, new DeleteCounter(), 
true);
+      this.hasDeletes = hasDeletes;
+    }
+
+    @Override
+    protected StructLike asStructLike(InternalRow record) {
+      return null;
+    }
+
+    @Override
+    protected InputFile getInputFile(String location) {
+      return null;
+    }
+
+    @Override
+    public boolean hasPosDeletes() {
+      return hasDeletes;
+    }
+
+    @Override
+    public PositionDeleteIndex deletedRowPositions() {
+      PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
+      if (hasDeletes) {
+        deletedRowPos.delete(98, 103);
+      }
+
+      return deletedRowPos;
+    }
+  }
+
+  public static class CustomizedPositionDeleteIndex implements 
PositionDeleteIndex {
+    private final Set<Long> deleteIndex;
+
+    private CustomizedPositionDeleteIndex() {
+      deleteIndex = Sets.newHashSet();
+    }
+
+    @Override
+    public void delete(long position) {
+      deleteIndex.add(position);
+    }
+
+    @Override
+    public void delete(long posStart, long posEnd) {
+      for (long l = posStart; l < posEnd; l++) {
+        delete(l);
+      }
+    }
+
+    @Override
+    public boolean isDeleted(long position) {
+      return deleteIndex.contains(position);
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return deleteIndex.isEmpty();
+    }
+  }
 }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
index 9d725250d3..13acaa1e3a 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data;
 
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -36,6 +37,7 @@ import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -46,6 +48,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.source.BatchReaderUtil;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.OrcConf;
@@ -74,7 +77,11 @@ public class TestSparkOrcReadMetadataColumns {
           MetadataColumns.ROW_POSITION,
           MetadataColumns.IS_DELETED);
 
+  private static final DeleteFilter<InternalRow> NO_DELETES_FILTER =
+      new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA, 
PROJECTION_SCHEMA);
+
   private static final int NUM_ROWS = 1000;
+  private static final int RECORDS_PER_BATCH = 10;
   private static final List<InternalRow> DATA_ROWS;
   private static final List<InternalRow> EXPECTED_ROWS;
 
@@ -128,13 +135,35 @@ public class TestSparkOrcReadMetadataColumns {
 
   @TestTemplate
   public void testReadRowNumbers() throws IOException {
-    readAndValidate(null, null, null, EXPECTED_ROWS);
+    readAndValidate(null, null, null, EXPECTED_ROWS, NO_DELETES_FILTER);
+  }
+
+  @TestTemplate
+  public void testReadRowNumbersWithDelete() throws IOException {
+    assumeThat(vectorized).isTrue();
+
+    List<InternalRow> expectedRowsAfterDelete = Lists.newArrayList();
+    EXPECTED_ROWS.forEach(row -> expectedRowsAfterDelete.add(row.copy()));
+    // remove row at position 98, 99, 100, 101, 102, this crosses two row 
groups [0, 100) and [100,
+    // 200)
+    for (int i = 98; i <= 102; i++) {
+      expectedRowsAfterDelete.get(i).update(3, true);
+    }
+
+    DeleteFilter<InternalRow> deleteFilter =
+        new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA, 
PROJECTION_SCHEMA);
+
+    readAndValidate(null, null, null, expectedRowsAfterDelete, deleteFilter);
   }
 
   @TestTemplate
   public void testReadRowNumbersWithFilter() throws IOException {
     readAndValidate(
-        Expressions.greaterThanOrEqual("id", 500), null, null, 
EXPECTED_ROWS.subList(500, 1000));
+        Expressions.greaterThanOrEqual("id", 500),
+        null,
+        null,
+        EXPECTED_ROWS.subList(500, 1000),
+        NO_DELETES_FILTER);
   }
 
   @TestTemplate
@@ -157,12 +186,17 @@ public class TestSparkOrcReadMetadataColumns {
           null,
           splitOffsets.get(i),
           splitLengths.get(i),
-          EXPECTED_ROWS.subList(i * 100, (i + 1) * 100));
+          EXPECTED_ROWS.subList(i * 100, (i + 1) * 100),
+          NO_DELETES_FILTER);
     }
   }
 
   private void readAndValidate(
-      Expression filter, Long splitStart, Long splitLength, List<InternalRow> 
expected)
+      Expression filter,
+      Long splitStart,
+      Long splitLength,
+      List<InternalRow> expected,
+      DeleteFilter<InternalRow> deleteFilter)
       throws IOException {
     Schema projectionWithoutMetadataFields =
         TypeUtil.selectNot(PROJECTION_SCHEMA, 
MetadataColumns.metadataFieldIds());
@@ -173,10 +207,12 @@ public class TestSparkOrcReadMetadataColumns {
 
       if (vectorized) {
         builder =
-            builder.createBatchedReaderFunc(
-                readOrcSchema ->
-                    VectorizedSparkOrcReaders.buildReader(
-                        PROJECTION_SCHEMA, readOrcSchema, ImmutableMap.of()));
+            builder
+                .recordsPerBatch(RECORDS_PER_BATCH)
+                .createBatchedReaderFunc(
+                    readOrcSchema ->
+                        VectorizedSparkOrcReaders.buildReader(
+                            PROJECTION_SCHEMA, readOrcSchema, 
ImmutableMap.of()));
       } else {
         builder =
             builder.createReaderFunc(
@@ -192,7 +228,7 @@ public class TestSparkOrcReadMetadataColumns {
       }
 
       if (vectorized) {
-        reader = batchesToRows(builder.build());
+        reader = 
batchesToRows(BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter));
       } else {
         reader = builder.build();
       }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
index ccd783915c..e2e5a98ccb 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
@@ -26,7 +26,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Files;
@@ -35,21 +34,16 @@ import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.deletes.DeleteCounter;
-import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.spark.source.BatchReaderUtil;
@@ -183,7 +177,8 @@ public class TestSparkParquetReadMetadataColumns {
     Parquet.ReadBuilder builder =
         Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
 
-    DeleteFilter<InternalRow> deleteFilter = new TestDeleteFilter(true);
+    DeleteFilter<InternalRow> deleteFilter =
+        new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA, 
PROJECTION_SCHEMA);
 
     builder.createBatchedReaderFunc(
         fileSchema ->
@@ -194,70 +189,6 @@ public class TestSparkParquetReadMetadataColumns {
     validate(expectedRowsAfterDelete, builder, deleteFilter);
   }
 
-  private static class TestDeleteFilter extends DeleteFilter<InternalRow> {
-    private final boolean hasDeletes;
-
-    protected TestDeleteFilter(boolean hasDeletes) {
-      super("", List.of(), DATA_SCHEMA, PROJECTION_SCHEMA, new 
DeleteCounter(), true);
-      this.hasDeletes = hasDeletes;
-    }
-
-    @Override
-    protected StructLike asStructLike(InternalRow record) {
-      return null;
-    }
-
-    @Override
-    protected InputFile getInputFile(String location) {
-      return null;
-    }
-
-    @Override
-    public boolean hasPosDeletes() {
-      return hasDeletes;
-    }
-
-    @Override
-    public PositionDeleteIndex deletedRowPositions() {
-      PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
-      if (hasDeletes) {
-        deletedRowPos.delete(98, 103);
-      }
-
-      return deletedRowPos;
-    }
-  }
-
-  private static class CustomizedPositionDeleteIndex implements 
PositionDeleteIndex {
-    private final Set<Long> deleteIndex;
-
-    private CustomizedPositionDeleteIndex() {
-      deleteIndex = Sets.newHashSet();
-    }
-
-    @Override
-    public void delete(long position) {
-      deleteIndex.add(position);
-    }
-
-    @Override
-    public void delete(long posStart, long posEnd) {
-      for (long l = posStart; l < posEnd; l++) {
-        delete(l);
-      }
-    }
-
-    @Override
-    public boolean isDeleted(long position) {
-      return deleteIndex.contains(position);
-    }
-
-    @Override
-    public boolean isEmpty() {
-      return deleteIndex.isEmpty();
-    }
-  }
-
   @TestTemplate
   public void testReadRowNumbersWithFilter() throws IOException {
     // current iceberg supports row group filter.
@@ -314,7 +245,10 @@ public class TestSparkParquetReadMetadataColumns {
       builder = builder.split(splitStart, splitLength);
     }
 
-    validate(expected, builder, new TestDeleteFilter(false));
+    validate(
+        expected,
+        builder,
+        new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA, 
PROJECTION_SCHEMA));
   }
 
   private void validate(

Reply via email to