This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c46639e0e Spark 3.5: Pass FileIO on Spark's read path (#15683)
0c46639e0e is described below

commit 0c46639e0e2e60748d6823d3620bd70838151fa9
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Mar 19 15:19:18 2026 +0100

    Spark 3.5: Pass FileIO on Spark's read path (#15683)
---
 .../TestRemoteScanPlanning.java                    |  47 +++++++-
 .../iceberg/spark/source/BaseBatchReader.java      |  10 +-
 .../apache/iceberg/spark/source/BaseReader.java    |   9 +-
 .../apache/iceberg/spark/source/BaseRowReader.java |  10 +-
 .../iceberg/spark/source/BatchDataReader.java      |   4 +
 .../iceberg/spark/source/ChangelogRowReader.java   |   4 +
 .../spark/source/EqualityDeleteRowReader.java      |  11 +-
 .../spark/source/PositionDeletesRowReader.java     |  12 ++-
 .../apache/iceberg/spark/source/RowDataReader.java |  11 +-
 .../spark/source/SerializableFileIOWithSize.java   | 120 +++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkBatch.java    |  10 +-
 .../iceberg/spark/source/SparkChangelogScan.java   |   1 +
 .../iceberg/spark/source/SparkInputPartition.java  |   8 ++
 .../spark/source/SparkMicroBatchStream.java        |  11 +-
 .../spark/source/SparkPartitioningAwareScan.java   |   9 +-
 .../org/apache/iceberg/spark/source/SparkScan.java |  15 ++-
 .../iceberg/spark/source/SparkStagedScan.java      |   2 +-
 .../apache/iceberg/spark/TestBaseWithCatalog.java  |   6 +-
 .../iceberg/spark/source/TestBaseReader.java       |   2 +-
 .../iceberg/spark/source/TestChangelogReader.java  |  12 ++-
 .../spark/source/TestPositionDeletesReader.java    |   3 +
 .../spark/source/TestSparkReaderDeletes.java       |   4 +-
 22 files changed, 296 insertions(+), 25 deletions(-)

diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
similarity index 53%
rename from 
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
rename to 
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
index 9c31eb970b..0152c8e0e6 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
@@ -16,15 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.spark.extensions;
+package org.apache.iceberg.spark.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.RESTCatalogProperties;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.sql.TestSelect;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 @ExtendWith(ParameterizedTestExtension.class)
@@ -38,8 +49,6 @@ public class TestRemoteScanPlanning extends TestSelect {
         ImmutableMap.builder()
             .putAll(SparkCatalogConfig.REST.properties())
             .put(CatalogProperties.URI, 
restCatalog.properties().get(CatalogProperties.URI))
-            // this flag is typically only set by the server, but we set it 
from the client for
-            // testing
             .put(
                 RESTCatalogProperties.SCAN_PLANNING_MODE,
                 RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
@@ -48,4 +57,36 @@ public class TestRemoteScanPlanning extends TestSelect {
       }
     };
   }
+
+  @TestTemplate
+  public void fileIOIsPropagated() {
+    RESTCatalog catalog = new RESTCatalog();
+    catalog.setConf(new Configuration());
+    catalog.initialize(
+        "test",
+        ImmutableMap.<String, String>builder()
+            .putAll(restCatalog.properties())
+            .put(
+                RESTCatalogProperties.SCAN_PLANNING_MODE,
+                RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
+            .build());
+    Table table = catalog.loadTable(tableIdent);
+
+    SparkScanBuilder builder = new SparkScanBuilder(spark, table, 
CaseInsensitiveStringMap.empty());
+    verifyFileIOHasPlanId(builder.build().toBatch(), table);
+    verifyFileIOHasPlanId(builder.buildCopyOnWriteScan().toBatch(), table);
+  }
+
+  private void verifyFileIOHasPlanId(Batch batch, Table table) {
+    FileIO fileIOForScan =
+        (FileIO)
+            assertThat(batch)
+                .extracting("fileIO")
+                .isInstanceOf(Supplier.class)
+                .asInstanceOf(InstanceOfAssertFactories.type(Supplier.class))
+                .actual()
+                .get();
+    
assertThat(fileIOForScan.properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+    
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index fe062f9d73..c1b2a58737 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -49,6 +50,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
 
   BaseBatchReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<T> taskGroup,
       Schema tableSchema,
       Schema expectedSchema,
@@ -57,7 +59,13 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       OrcBatchReadConf orcConf,
       boolean cacheDeleteFilesOnExecutors) {
     super(
-        table, taskGroup, tableSchema, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+        table,
+        fileIO,
+        taskGroup,
+        tableSchema,
+        expectedSchema,
+        caseSensitive,
+        cacheDeleteFilesOnExecutors);
     this.parquetConf = parquetConf;
     this.orcConf = orcConf;
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index bf16226171..0333f1e45d 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.data.DeleteLoader;
 import org.apache.iceberg.deletes.DeleteCounter;
 import org.apache.iceberg.encryption.EncryptingFileIO;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
@@ -66,6 +67,7 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
 
   private final Table table;
+  private final EncryptingFileIO fileIO;
   private final Schema tableSchema;
   private final Schema expectedSchema;
   private final boolean caseSensitive;
@@ -82,12 +84,14 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
 
   BaseReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<TaskT> taskGroup,
       Schema tableSchema,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
     this.table = table;
+    this.fileIO = EncryptingFileIO.combine(fileIO, table().encryption());
     this.taskGroup = taskGroup;
     this.tasks = taskGroup.tasks().iterator();
     this.currentIterator = CloseableIterator.empty();
@@ -180,9 +184,8 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
   private Map<String, InputFile> inputFiles() {
     if (lazyInputFiles == null) {
       this.lazyInputFiles =
-          EncryptingFileIO.combine(table().io(), table().encryption())
-              .bulkDecrypt(
-                  () -> 
taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
+          fileIO.bulkDecrypt(
+              () -> 
taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
     }
 
     return lazyInputFiles;
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
index 53d44e760a..a7016e3b09 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -28,19 +28,27 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 abstract class BaseRowReader<T extends ScanTask> extends 
BaseReader<InternalRow, T> {
   BaseRowReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<T> taskGroup,
       Schema tableSchema,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
     super(
-        table, taskGroup, tableSchema, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+        table,
+        fileIO,
+        taskGroup,
+        tableSchema,
+        expectedSchema,
+        caseSensitive,
+        cacheDeleteFilesOnExecutors);
   }
 
   protected CloseableIterable<InternalRow> newIterable(
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 9ec0f88577..3dcfb604ea 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -53,6 +54,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
       OrcBatchReadConf orcBatchReadConf) {
     this(
         partition.table(),
+        partition.io(),
         partition.taskGroup(),
         SnapshotUtil.schemaFor(partition.table(), partition.branch()),
         partition.expectedSchema(),
@@ -64,6 +66,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
 
   BatchDataReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<FileScanTask> taskGroup,
       Schema tableSchema,
       Schema expectedSchema,
@@ -73,6 +76,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
       boolean cacheDeleteFilesOnExecutors) {
     super(
         table,
+        fileIO,
         taskGroup,
         tableSchema,
         expectedSchema,
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index b8fa129f6a..3657475651 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.util.SnapshotUtil;
@@ -51,6 +52,7 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
   ChangelogRowReader(SparkInputPartition partition) {
     this(
         partition.table(),
+        partition.io(),
         partition.taskGroup(),
         SnapshotUtil.schemaFor(partition.table(), partition.branch()),
         partition.expectedSchema(),
@@ -60,6 +62,7 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
 
   ChangelogRowReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<ChangelogScanTask> taskGroup,
       Schema tableSchema,
       Schema expectedSchema,
@@ -67,6 +70,7 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
       boolean cacheDeleteFilesOnExecutors) {
     super(
         table,
+        fileIO,
         taskGroup,
         tableSchema,
         ChangelogUtil.dropChangelogMetadata(expectedSchema),
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index e1292647b7..96dd99ea64 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
@@ -32,11 +33,19 @@ public class EqualityDeleteRowReader extends RowDataReader {
   public EqualityDeleteRowReader(
       CombinedScanTask task,
       Table table,
+      FileIO fileIO,
       Schema tableSchema,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
-    super(table, task, tableSchema, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+    super(
+        table,
+        fileIO,
+        task,
+        tableSchema,
+        expectedSchema,
+        caseSensitive,
+        cacheDeleteFilesOnExecutors);
   }
 
   @Override
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 8ad1f3ad39..7c5969effb 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
@@ -49,6 +50,7 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
   PositionDeletesRowReader(SparkInputPartition partition) {
     this(
         partition.table(),
+        partition.io(),
         partition.taskGroup(),
         SnapshotUtil.schemaFor(partition.table(), partition.branch()),
         partition.expectedSchema(),
@@ -58,14 +60,20 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
 
   PositionDeletesRowReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<PositionDeletesScanTask> taskGroup,
       Schema tableSchema,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
-
     super(
-        table, taskGroup, tableSchema, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+        table,
+        fileIO,
+        taskGroup,
+        tableSchema,
+        expectedSchema,
+        caseSensitive,
+        cacheDeleteFilesOnExecutors);
 
     int numSplits = taskGroup.tasks().size();
     LOG.debug("Reading {} position delete file split(s) for table {}", 
numSplits, table.name());
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index b2b3c78563..08aa44f710 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
@@ -48,6 +49,7 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
   RowDataReader(SparkInputPartition partition) {
     this(
         partition.table(),
+        partition.io(),
         partition.taskGroup(),
         SnapshotUtil.schemaFor(partition.table(), partition.branch()),
         partition.expectedSchema(),
@@ -57,6 +59,7 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
 
   RowDataReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<FileScanTask> taskGroup,
       Schema tableSchema,
       Schema expectedSchema,
@@ -64,7 +67,13 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
       boolean cacheDeleteFilesOnExecutors) {
 
     super(
-        table, taskGroup, tableSchema, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+        table,
+        fileIO,
+        taskGroup,
+        tableSchema,
+        expectedSchema,
+        caseSensitive,
+        cacheDeleteFilesOnExecutors);
 
     numSplits = taskGroup.tasks().size();
     LOG.debug("Reading {} file split(s) for table {}", numSplits, 
table.name());
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
new file mode 100644
index 0000000000..49189d9d57
--- /dev/null
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.spark.util.KnownSizeEstimation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides a serializable {@link FileIO} with a known size 
estimate. Spark calls its
+ * {@link org.apache.spark.util.SizeEstimator} class when broadcasting 
variables and this can be an
+ * expensive operation, so providing a known size estimate allows that 
operation to be skipped.
+ *
+ * <p>This class also implements {@link AutoCloseable} to avoid leaking 
resources upon broadcasting.
+ * Broadcast variables are destroyed and cleaned up on the driver and 
executors once they are
+ * garbage collected on the driver. The implementation ensures only resources 
used by copies of the
+ * main {@link FileIO} are released.
+ */
+class SerializableFileIOWithSize
+    implements FileIO, HadoopConfigurable, KnownSizeEstimation, AutoCloseable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SerializableFileIOWithSize.class);
+  private static final long SIZE_ESTIMATE = 32_768L;
+  private final transient Object serializationMarker;
+  private final FileIO fileIO;
+
+  private SerializableFileIOWithSize(FileIO fileIO) {
+    this.fileIO = fileIO;
+    this.serializationMarker = new Object();
+  }
+
+  @Override
+  public long estimatedSize() {
+    return SIZE_ESTIMATE;
+  }
+
+  public static FileIO wrap(FileIO fileIO) {
+    return new SerializableFileIOWithSize(fileIO);
+  }
+
+  @Override
+  public void close() {
+    if (null == serializationMarker) {
+      LOG.debug("Closing FileIO");
+      fileIO.close();
+    }
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return fileIO.newInputFile(path);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return fileIO.newOutputFile(path);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    fileIO.deleteFile(path);
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    fileIO.initialize(properties);
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return fileIO.properties();
+  }
+
+  @Override
+  public void serializeConfWith(
+      Function<Configuration, SerializableSupplier<Configuration>> 
confSerializer) {
+    if (fileIO instanceof HadoopConfigurable configurable) {
+      configurable.serializeConfWith(confSerializer);
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (fileIO instanceof HadoopConfigurable configurable) {
+      configurable.setConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    if (fileIO instanceof HadoopConfigurable hadoopConfigurable) {
+      return hadoopConfigurable.getConf();
+    }
+
+    return null;
+  }
+}
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index 261d5fa227..2109936c96 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Supplier;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
@@ -28,6 +29,7 @@ import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.spark.ImmutableOrcBatchReadConf;
 import org.apache.iceberg.spark.ImmutableParquetBatchReadConf;
 import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -45,6 +47,7 @@ class SparkBatch implements Batch {
 
   private final JavaSparkContext sparkContext;
   private final Table table;
+  private final Supplier<FileIO> fileIO;
   private final String branch;
   private final SparkReadConf readConf;
   private final Types.StructType groupingKeyType;
@@ -59,6 +62,7 @@ class SparkBatch implements Batch {
   SparkBatch(
       JavaSparkContext sparkContext,
       Table table,
+      Supplier<FileIO> fileIO,
       SparkReadConf readConf,
       Types.StructType groupingKeyType,
       List<? extends ScanTaskGroup<?>> taskGroups,
@@ -66,6 +70,7 @@ class SparkBatch implements Batch {
       int scanHashCode) {
     this.sparkContext = sparkContext;
     this.table = table;
+    this.fileIO = fileIO;
     this.branch = readConf.branch();
     this.readConf = readConf;
     this.groupingKeyType = groupingKeyType;
@@ -83,6 +88,8 @@ class SparkBatch implements Batch {
     // broadcast the table metadata as input partitions will be sent to 
executors
     Broadcast<Table> tableBroadcast =
         sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+    Broadcast<FileIO> fileIOBroadcast =
+        sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get()));
     String expectedSchemaString = SchemaParser.toJson(expectedSchema);
     String[][] locations = computePreferredLocations();
 
@@ -94,6 +101,7 @@ class SparkBatch implements Batch {
               groupingKeyType,
               taskGroups.get(index),
               tableBroadcast,
+              fileIOBroadcast,
               branch,
               expectedSchemaString,
               caseSensitive,
@@ -106,7 +114,7 @@ class SparkBatch implements Batch {
 
   private String[][] computePreferredLocations() {
     if (localityEnabled) {
-      return SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups);
+      return SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups);
 
     } else if (executorCacheLocalityEnabled) {
       List<String> executorLocations = SparkUtil.executorLocations();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 55ea137ca1..eba0431e3a 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -106,6 +106,7 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
     return new SparkBatch(
         sparkContext,
         table,
+        null != scan ? scan.fileIO() : table::io,
         readConf,
         EMPTY_GROUPING_KEY_TYPE,
         taskGroups(),
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
index 99b1d78a86..a930317802 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,6 +35,7 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
   private final Types.StructType groupingKeyType;
   private final ScanTaskGroup<?> taskGroup;
   private final Broadcast<Table> tableBroadcast;
+  private final Broadcast<FileIO> fileIOBroadcast;
   private final String branch;
   private final String expectedSchemaString;
   private final boolean caseSensitive;
@@ -46,6 +48,7 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
       Types.StructType groupingKeyType,
       ScanTaskGroup<?> taskGroup,
       Broadcast<Table> tableBroadcast,
+      Broadcast<FileIO> fileIOBroadcast,
       String branch,
       String expectedSchemaString,
       boolean caseSensitive,
@@ -54,6 +57,7 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
     this.groupingKeyType = groupingKeyType;
     this.taskGroup = taskGroup;
     this.tableBroadcast = tableBroadcast;
+    this.fileIOBroadcast = fileIOBroadcast;
     this.branch = branch;
     this.expectedSchemaString = expectedSchemaString;
     this.caseSensitive = caseSensitive;
@@ -84,6 +88,10 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
     return tableBroadcast.value();
   }
 
+  public FileIO io() {
+    return fileIOBroadcast.value();
+  }
+
   public String branch() {
     return branch;
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 0745085259..a82583747a 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -27,6 +27,7 @@ import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Locale;
+import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataOperations;
@@ -77,10 +78,12 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = 
Types.StructType.of();
 
   private final Table table;
+  private final Supplier<FileIO> fileIO;
   private final String branch;
   private final boolean caseSensitive;
   private final String expectedSchema;
   private final Broadcast<Table> tableBroadcast;
+  private final Broadcast<FileIO> fileIOBroadcast;
   private final long splitSize;
   private final int splitLookback;
   private final long splitOpenFileCost;
@@ -97,15 +100,18 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   SparkMicroBatchStream(
       JavaSparkContext sparkContext,
       Table table,
+      Supplier<FileIO> fileIO,
       SparkReadConf readConf,
       Schema expectedSchema,
       String checkpointLocation) {
     this.table = table;
+    this.fileIO = fileIO;
     this.branch = readConf.branch();
     this.caseSensitive = readConf.caseSensitive();
     this.expectedSchema = SchemaParser.toJson(expectedSchema);
     this.localityPreferred = readConf.localityEnabled();
     this.tableBroadcast = 
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+    this.fileIOBroadcast = 
sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get()));
     this.splitSize = readConf.splitSize();
     this.splitLookback = readConf.splitLookback();
     this.splitOpenFileCost = readConf.splitOpenFileCost();
@@ -172,6 +178,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
               EMPTY_GROUPING_KEY_TYPE,
               combinedScanTasks.get(index),
               tableBroadcast,
+              fileIOBroadcast,
               branch,
               expectedSchema,
               caseSensitive,
@@ -183,7 +190,9 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   }
 
   private String[][] computePreferredLocations(List<CombinedScanTask> 
taskGroups) {
-    return localityPreferred ? 
SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups) : null;
+    return localityPreferred
+        ? SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups)
+        : null;
   }
 
   @Override
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index c9726518ee..4d9fb7556b 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -78,7 +78,14 @@ abstract class SparkPartitioningAwareScan<T extends 
PartitionScanTask> extends S
       Schema expectedSchema,
       List<Expression> filters,
       Supplier<ScanReport> scanReportSupplier) {
-    super(spark, table, readConf, expectedSchema, filters, scanReportSupplier);
+    super(
+        spark,
+        table,
+        null != scan ? scan.fileIO() : table::io,
+        readConf,
+        expectedSchema,
+        filters,
+        scanReportSupplier);
 
     this.scan = scan;
     this.preserveDataGrouping = readConf.preserveDataGrouping();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 106b296de0..a921f1446a 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -101,6 +102,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
 
   private final JavaSparkContext sparkContext;
   private final Table table;
+  private final Supplier<FileIO> fileIO;
   private final SparkSession spark;
   private final SparkReadConf readConf;
   private final boolean caseSensitive;
@@ -115,6 +117,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   SparkScan(
       SparkSession spark,
       Table table,
+      Supplier<FileIO> fileIO,
       SparkReadConf readConf,
       Schema expectedSchema,
       List<Expression> filters,
@@ -125,6 +128,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
     this.spark = spark;
     this.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
+    this.fileIO = fileIO;
     this.readConf = readConf;
     this.caseSensitive = readConf.caseSensitive();
     this.expectedSchema = expectedSchema;
@@ -162,13 +166,20 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   @Override
   public Batch toBatch() {
     return new SparkBatch(
-        sparkContext, table, readConf, groupingKeyType(), taskGroups(), 
expectedSchema, hashCode());
+        sparkContext,
+        table,
+        fileIO,
+        readConf,
+        groupingKeyType(),
+        taskGroups(),
+        expectedSchema,
+        hashCode());
   }
 
   @Override
   public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
     return new SparkMicroBatchStream(
-        sparkContext, table, readConf, expectedSchema, checkpointLocation);
+        sparkContext, table, fileIO, readConf, expectedSchema, 
checkpointLocation);
   }
 
   @Override
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index d2eb4e5a56..99e0deabb0 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -41,7 +41,7 @@ class SparkStagedScan extends SparkScan {
   private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of 
tasks
 
   SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, 
SparkReadConf readConf) {
-    super(spark, table, readConf, expectedSchema, ImmutableList.of(), null);
+    super(spark, table, table::io, readConf, expectedSchema, 
ImmutableList.of(), null);
     this.taskSetId = readConf.scanTaskSetId();
     this.splitSize = readConf.splitSize();
     this.splitLookback = readConf.splitLookback();
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
index 7df9c75fb3..1760143d2c 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
@@ -69,7 +69,11 @@ public abstract class TestBaseWithCatalog extends TestBase {
               // status even belonging to the same catalog. Reference:
               // https://www.sqlite.org/inmemorydb.html
               CatalogProperties.CLIENT_POOL_SIZE,
-              "1"));
+              "1",
+              "include-credentials",
+              "true",
+              "gcs.oauth2.token",
+              "dummyToken"));
 
   protected static RESTCatalog restCatalog;
 
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
index 1e53710a0f..8e26a2f426 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
@@ -90,7 +90,7 @@ public class TestBaseReader {
     private final Map<String, CloseableIntegerRange> tracker = 
Maps.newHashMap();
 
     ClosureTrackingReader(Table table, List<FileScanTask> tasks) {
-      super(table, new BaseCombinedScanTask(tasks), null, null, false, true);
+      super(table, table.io(), new BaseCombinedScanTask(tasks), null, null, 
false, true);
     }
 
     @Override
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
index b88f0233e2..be4391aab6 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
@@ -105,7 +105,8 @@ public class TestChangelogReader extends TestBase {
 
     for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
       ChangelogRowReader reader =
-          new ChangelogRowReader(table, taskGroup, table.schema(), 
table.schema(), false, true);
+          new ChangelogRowReader(
+              table, table.io(), taskGroup, table.schema(), table.schema(), 
false, true);
       while (reader.next()) {
         rows.add(reader.get().copy());
       }
@@ -136,7 +137,8 @@ public class TestChangelogReader extends TestBase {
 
     for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
       ChangelogRowReader reader =
-          new ChangelogRowReader(table, taskGroup, table.schema(), 
table.schema(), false, true);
+          new ChangelogRowReader(
+              table, table.io(), taskGroup, table.schema(), table.schema(), 
false, true);
       while (reader.next()) {
         rows.add(reader.get().copy());
       }
@@ -170,7 +172,8 @@ public class TestChangelogReader extends TestBase {
 
     for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
       ChangelogRowReader reader =
-          new ChangelogRowReader(table, taskGroup, table.schema(), 
table.schema(), false, true);
+          new ChangelogRowReader(
+              table, table.io(), taskGroup, table.schema(), table.schema(), 
false, true);
       while (reader.next()) {
         rows.add(reader.get().copy());
       }
@@ -197,7 +200,8 @@ public class TestChangelogReader extends TestBase {
 
     for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
       ChangelogRowReader reader =
-          new ChangelogRowReader(table, taskGroup, table.schema(), 
table.schema(), false, true);
+          new ChangelogRowReader(
+              table, table.io(), taskGroup, table.schema(), table.schema(), 
false, true);
       while (reader.next()) {
         rows.add(reader.get().copy());
       }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
index 764e1c6c93..92aace3dfd 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
@@ -180,6 +180,7 @@ public class TestPositionDeletesReader extends TestBase {
     try (PositionDeletesRowReader reader =
         new PositionDeletesRowReader(
             table,
+            table.io(),
             new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
             positionDeletesTable.schema(),
             projectedSchema,
@@ -220,6 +221,7 @@ public class TestPositionDeletesReader extends TestBase {
     try (PositionDeletesRowReader reader =
         new PositionDeletesRowReader(
             table,
+            table.io(),
             new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)),
             positionDeletesTable.schema(),
             projectedSchema,
@@ -292,6 +294,7 @@ public class TestPositionDeletesReader extends TestBase {
     try (PositionDeletesRowReader reader =
         new PositionDeletesRowReader(
             table,
+            table.io(),
             new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
             positionDeletesTable.schema(),
             projectedSchema,
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index e16c9c2176..7dd4c6f7cf 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -329,7 +329,8 @@ public class TestSparkReaderDeletes extends DeleteReadTests 
{
 
     for (CombinedScanTask task : tasks) {
       try (EqualityDeleteRowReader reader =
-          new EqualityDeleteRowReader(task, table, table.schema(), 
table.schema(), false, true)) {
+          new EqualityDeleteRowReader(
+              task, table, table.io(), table.schema(), table.schema(), false, 
true)) {
         while (reader.next()) {
           actualRowSet.add(
               new InternalRowWrapper(
@@ -675,6 +676,7 @@ public class TestSparkReaderDeletes extends DeleteReadTests 
{
           new BatchDataReader(
               // expected column is id, while the equality filter column is dt
               dateTable,
+              dateTable.io(),
               task,
               dateTable.schema(),
               dateTable.schema().select("id"),


Reply via email to