This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new be25b80f7f Spark 4.1: Pass FileIO on Spark's read path (#15448)
be25b80f7f is described below

commit be25b80f7fa06406343f894463b72ded5f383e9e
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Mar 19 15:18:48 2026 +0100

    Spark 4.1: Pass FileIO on Spark's read path (#15448)
---
 .../TestRemoteScanPlanning.java                    |  47 +++++++-
 .../iceberg/spark/source/BaseBatchReader.java      |   4 +-
 .../apache/iceberg/spark/source/BaseReader.java    |   9 +-
 .../apache/iceberg/spark/source/BaseRowReader.java |   4 +-
 .../iceberg/spark/source/BatchDataReader.java      |   4 +
 .../iceberg/spark/source/ChangelogRowReader.java   |   4 +
 .../spark/source/EqualityDeleteRowReader.java      |   4 +-
 .../spark/source/PositionDeletesRowReader.java     |   6 +-
 .../apache/iceberg/spark/source/RowDataReader.java |   5 +-
 .../spark/source/SerializableFileIOWithSize.java   | 120 +++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkBatch.java    |  10 +-
 .../iceberg/spark/source/SparkChangelogScan.java   |   1 +
 .../iceberg/spark/source/SparkInputPartition.java  |   8 ++
 .../spark/source/SparkMicroBatchStream.java        |  11 +-
 .../spark/source/SparkPartitioningAwareScan.java   |  10 +-
 .../org/apache/iceberg/spark/source/SparkScan.java |  16 ++-
 .../iceberg/spark/source/SparkStagedScan.java      |   2 +-
 .../apache/iceberg/spark/TestBaseWithCatalog.java  |   6 +-
 .../iceberg/spark/source/TestBaseReader.java       |   2 +-
 .../iceberg/spark/source/TestChangelogReader.java  |   8 +-
 .../spark/source/TestPositionDeletesReader.java    |   3 +
 .../spark/source/TestSparkReaderDeletes.java       |  11 +-
 22 files changed, 269 insertions(+), 26 deletions(-)

diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
similarity index 53%
rename from 
spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
rename to 
spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
index 9c31eb970b..0152c8e0e6 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
@@ -16,15 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.spark.extensions;
+package org.apache.iceberg.spark.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.RESTCatalogProperties;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.sql.TestSelect;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 @ExtendWith(ParameterizedTestExtension.class)
@@ -38,8 +49,6 @@ public class TestRemoteScanPlanning extends TestSelect {
         ImmutableMap.builder()
             .putAll(SparkCatalogConfig.REST.properties())
             .put(CatalogProperties.URI, 
restCatalog.properties().get(CatalogProperties.URI))
-            // this flag is typically only set by the server, but we set it 
from the client for
-            // testing
             .put(
                 RESTCatalogProperties.SCAN_PLANNING_MODE,
                 RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
@@ -48,4 +57,36 @@ public class TestRemoteScanPlanning extends TestSelect {
       }
     };
   }
+
+  @TestTemplate
+  public void fileIOIsPropagated() {
+    RESTCatalog catalog = new RESTCatalog();
+    catalog.setConf(new Configuration());
+    catalog.initialize(
+        "test",
+        ImmutableMap.<String, String>builder()
+            .putAll(restCatalog.properties())
+            .put(
+                RESTCatalogProperties.SCAN_PLANNING_MODE,
+                RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
+            .build());
+    Table table = catalog.loadTable(tableIdent);
+
+    SparkScanBuilder builder = new SparkScanBuilder(spark, table, 
CaseInsensitiveStringMap.empty());
+    verifyFileIOHasPlanId(builder.build().toBatch(), table);
+    verifyFileIOHasPlanId(builder.buildCopyOnWriteScan().toBatch(), table);
+  }
+
+  private void verifyFileIOHasPlanId(Batch batch, Table table) {
+    FileIO fileIOForScan =
+        (FileIO)
+            assertThat(batch)
+                .extracting("fileIO")
+                .isInstanceOf(Supplier.class)
+                .asInstanceOf(InstanceOfAssertFactories.type(Supplier.class))
+                .actual()
+                .get();
+    
assertThat(fileIOForScan.properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+    
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+  }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index 2dd6a52c26..a2af0964c6 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -49,13 +50,14 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
 
   BaseBatchReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<T> taskGroup,
       Schema expectedSchema,
       boolean caseSensitive,
       ParquetBatchReadConf parquetConf,
       OrcBatchReadConf orcConf,
       boolean cacheDeleteFilesOnExecutors) {
-    super(table, taskGroup, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+    super(table, fileIO, taskGroup, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
     this.parquetConf = parquetConf;
     this.orcConf = orcConf;
   }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 8adc38f2ae..a4d9766ae7 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.data.DeleteLoader;
 import org.apache.iceberg.deletes.DeleteCounter;
 import org.apache.iceberg.encryption.EncryptingFileIO;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
@@ -68,6 +69,7 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
 
   private final Table table;
+  private final EncryptingFileIO fileIO;
   private final Schema expectedSchema;
   private final boolean caseSensitive;
   private final NameMapping nameMapping;
@@ -83,11 +85,13 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
 
   BaseReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<TaskT> taskGroup,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
     this.table = table;
+    this.fileIO = EncryptingFileIO.combine(fileIO, table().encryption());
     this.taskGroup = taskGroup;
     this.tasks = taskGroup.tasks().iterator();
     this.currentIterator = CloseableIterator.empty();
@@ -179,9 +183,8 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
   private Map<String, InputFile> inputFiles() {
     if (lazyInputFiles == null) {
       this.lazyInputFiles =
-          EncryptingFileIO.combine(table().io(), table().encryption())
-              .bulkDecrypt(
-                  () -> 
taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
+          fileIO.bulkDecrypt(
+              () -> 
taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
     }
 
     return lazyInputFiles;
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
index 14febb212a..cbc40db533 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -28,17 +28,19 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 abstract class BaseRowReader<T extends ScanTask> extends 
BaseReader<InternalRow, T> {
   BaseRowReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<T> taskGroup,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
-    super(table, taskGroup, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+    super(table, fileIO, taskGroup, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
   }
 
   protected CloseableIterable<InternalRow> newIterable(
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 9a4ab30fec..237dfd5c69 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -52,6 +53,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
       OrcBatchReadConf orcBatchReadConf) {
     this(
         partition.table(),
+        partition.io(),
         partition.taskGroup(),
         partition.projection(),
         partition.isCaseSensitive(),
@@ -62,6 +64,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
 
   BatchDataReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<FileScanTask> taskGroup,
       Schema expectedSchema,
       boolean caseSensitive,
@@ -70,6 +73,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
       boolean cacheDeleteFilesOnExecutors) {
     super(
         table,
+        fileIO,
         taskGroup,
         expectedSchema,
         caseSensitive,
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 417440d4b4..eb8e5e63f4 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.spark.rdd.InputFileBlockHolder;
@@ -50,6 +51,7 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
   ChangelogRowReader(SparkInputPartition partition) {
     this(
         partition.table(),
+        partition.io(),
         partition.taskGroup(),
         partition.projection(),
         partition.isCaseSensitive(),
@@ -58,12 +60,14 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
 
   ChangelogRowReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<ChangelogScanTask> taskGroup,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
     super(
         table,
+        fileIO,
         taskGroup,
         ChangelogUtil.dropChangelogMetadata(expectedSchema),
         caseSensitive,
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index 5c4c21df34..aae399c5f2 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
@@ -32,10 +33,11 @@ public class EqualityDeleteRowReader extends RowDataReader {
   public EqualityDeleteRowReader(
       CombinedScanTask task,
       Table table,
+      FileIO fileIO,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
-    super(table, task, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+    super(table, fileIO, task, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
   }
 
   @Override
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 1a45facba6..b14970722e 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
@@ -48,6 +49,7 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
   PositionDeletesRowReader(SparkInputPartition partition) {
     this(
         partition.table(),
+        partition.io(),
         partition.taskGroup(),
         partition.projection(),
         partition.isCaseSensitive(),
@@ -56,12 +58,12 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
 
   PositionDeletesRowReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<PositionDeletesScanTask> taskGroup,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
-
-    super(table, taskGroup, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+    super(table, fileIO, taskGroup, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
 
     int numSplits = taskGroup.tasks().size();
     LOG.debug("Reading {} position delete file split(s) for table {}", 
numSplits, table.name());
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 0b53e72d99..dbfb0b7614 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
@@ -47,6 +48,7 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
   RowDataReader(SparkInputPartition partition) {
     this(
         partition.table(),
+        partition.io(),
         partition.taskGroup(),
         partition.projection(),
         partition.isCaseSensitive(),
@@ -55,12 +57,13 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
 
   RowDataReader(
       Table table,
+      FileIO fileIO,
       ScanTaskGroup<FileScanTask> taskGroup,
       Schema expectedSchema,
       boolean caseSensitive,
       boolean cacheDeleteFilesOnExecutors) {
 
-    super(table, taskGroup, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
+    super(table, fileIO, taskGroup, expectedSchema, caseSensitive, 
cacheDeleteFilesOnExecutors);
 
     numSplits = taskGroup.tasks().size();
     LOG.debug("Reading {} file split(s) for table {}", numSplits, 
table.name());
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
new file mode 100644
index 0000000000..49189d9d57
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.spark.util.KnownSizeEstimation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides a serializable {@link FileIO} with a known size 
estimate. Spark calls its
+ * {@link org.apache.spark.util.SizeEstimator} class when broadcasting 
variables and this can be an
+ * expensive operation, so providing a known size estimate allows that 
operation to be skipped.
+ *
+ * <p>This class also implements {@link AutoCloseable} to avoid leaking 
resources upon broadcasting.
+ * Broadcast variables are destroyed and cleaned up on the driver and 
executors once they are
+ * garbage collected on the driver. The implementation ensures only resources 
used by copies of the
+ * main {@link FileIO} are released.
+ */
+class SerializableFileIOWithSize
+    implements FileIO, HadoopConfigurable, KnownSizeEstimation, AutoCloseable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SerializableFileIOWithSize.class);
+  private static final long SIZE_ESTIMATE = 32_768L;
+  private final transient Object serializationMarker;
+  private final FileIO fileIO;
+
+  private SerializableFileIOWithSize(FileIO fileIO) {
+    this.fileIO = fileIO;
+    this.serializationMarker = new Object();
+  }
+
+  @Override
+  public long estimatedSize() {
+    return SIZE_ESTIMATE;
+  }
+
+  public static FileIO wrap(FileIO fileIO) {
+    return new SerializableFileIOWithSize(fileIO);
+  }
+
+  @Override
+  public void close() {
+    if (null == serializationMarker) {
+      LOG.debug("Closing FileIO");
+      fileIO.close();
+    }
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return fileIO.newInputFile(path);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return fileIO.newOutputFile(path);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    fileIO.deleteFile(path);
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    fileIO.initialize(properties);
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return fileIO.properties();
+  }
+
+  @Override
+  public void serializeConfWith(
+      Function<Configuration, SerializableSupplier<Configuration>> 
confSerializer) {
+    if (fileIO instanceof HadoopConfigurable configurable) {
+      configurable.serializeConfWith(confSerializer);
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (fileIO instanceof HadoopConfigurable configurable) {
+      configurable.setConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    if (fileIO instanceof HadoopConfigurable hadoopConfigurable) {
+      return hadoopConfigurable.getConf();
+    }
+
+    return null;
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index f4946c6404..22a4b171b3 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Supplier;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
@@ -28,6 +29,7 @@ import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.spark.ImmutableOrcBatchReadConf;
 import org.apache.iceberg.spark.ImmutableParquetBatchReadConf;
 import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -45,6 +47,7 @@ class SparkBatch implements Batch {
 
   private final JavaSparkContext sparkContext;
   private final Table table;
+  private final Supplier<FileIO> fileIO;
   private final SparkReadConf readConf;
   private final Types.StructType groupingKeyType;
   private final List<? extends ScanTaskGroup<?>> taskGroups;
@@ -58,6 +61,7 @@ class SparkBatch implements Batch {
   SparkBatch(
       JavaSparkContext sparkContext,
       Table table,
+      Supplier<FileIO> fileIO,
       SparkReadConf readConf,
       Types.StructType groupingKeyType,
       List<? extends ScanTaskGroup<?>> taskGroups,
@@ -65,6 +69,7 @@ class SparkBatch implements Batch {
       int scanHashCode) {
     this.sparkContext = sparkContext;
     this.table = table;
+    this.fileIO = fileIO;
     this.readConf = readConf;
     this.groupingKeyType = groupingKeyType;
     this.taskGroups = taskGroups;
@@ -81,6 +86,8 @@ class SparkBatch implements Batch {
     // broadcast the table metadata as input partitions will be sent to 
executors
     Broadcast<Table> tableBroadcast =
         sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+    Broadcast<FileIO> fileIOBroadcast =
+        sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get()));
     String projectionString = SchemaParser.toJson(projection);
     String[][] locations = computePreferredLocations();
 
@@ -92,6 +99,7 @@ class SparkBatch implements Batch {
               groupingKeyType,
               taskGroups.get(index),
               tableBroadcast,
+              fileIOBroadcast,
               projectionString,
               caseSensitive,
               locations != null ? locations[index] : 
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
@@ -103,7 +111,7 @@ class SparkBatch implements Batch {
 
   private String[][] computePreferredLocations() {
     if (localityEnabled) {
-      return SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups);
+      return SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups);
 
     } else if (executorCacheLocalityEnabled) {
       List<String> executorLocations = SparkUtil.executorLocations();
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 4425c4936a..57ccf92b96 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -105,6 +105,7 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
     return new SparkBatch(
         sparkContext,
         table,
+        null != scan ? scan.fileIO() : table::io,
         readConf,
         EMPTY_GROUPING_KEY_TYPE,
         taskGroups(),
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
index 98a0061b3a..a3d78b43a9 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,6 +35,7 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
   private final Types.StructType groupingKeyType;
   private final ScanTaskGroup<?> taskGroup;
   private final Broadcast<Table> tableBroadcast;
+  private final Broadcast<FileIO> fileIOBroadcast;
   private final String projectionString;
   private final boolean caseSensitive;
   private final transient String[] preferredLocations;
@@ -45,6 +47,7 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
       Types.StructType groupingKeyType,
       ScanTaskGroup<?> taskGroup,
       Broadcast<Table> tableBroadcast,
+      Broadcast<FileIO> fileIOBroadcast,
       String projectionString,
       boolean caseSensitive,
       String[] preferredLocations,
@@ -52,6 +55,7 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
     this.groupingKeyType = groupingKeyType;
     this.taskGroup = taskGroup;
     this.tableBroadcast = tableBroadcast;
+    this.fileIOBroadcast = fileIOBroadcast;
     this.projectionString = projectionString;
     this.caseSensitive = caseSensitive;
     this.preferredLocations = preferredLocations;
@@ -81,6 +85,10 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
     return tableBroadcast.value();
   }
 
+  public FileIO io() {
+    return fileIOBroadcast.value();
+  }
+
   public boolean isCaseSensitive() {
     return caseSensitive;
   }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index c9a0f2566b..7adf3c633c 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
 import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
@@ -61,10 +62,12 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = 
Types.StructType.of();
 
   private final Table table;
+  private final Supplier<FileIO> fileIO;
   private final SparkReadConf readConf;
   private final boolean caseSensitive;
   private final String projection;
   private final Broadcast<Table> tableBroadcast;
+  private final Broadcast<FileIO> fileIOBroadcast;
   private final long splitSize;
   private final int splitLookback;
   private final long splitOpenFileCost;
@@ -80,15 +83,18 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   SparkMicroBatchStream(
       JavaSparkContext sparkContext,
       Table table,
+      Supplier<FileIO> fileIO,
       SparkReadConf readConf,
       Schema projection,
       String checkpointLocation) {
     this.table = table;
+    this.fileIO = fileIO;
     this.readConf = readConf;
     this.caseSensitive = readConf.caseSensitive();
     this.projection = SchemaParser.toJson(projection);
     this.localityPreferred = readConf.localityEnabled();
     this.tableBroadcast = 
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+    this.fileIOBroadcast = 
sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get()));
     this.splitSize = readConf.splitSize();
     this.splitLookback = readConf.splitLookback();
     this.splitOpenFileCost = readConf.splitOpenFileCost();
@@ -158,6 +164,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
               EMPTY_GROUPING_KEY_TYPE,
               combinedScanTasks.get(index),
               tableBroadcast,
+              fileIOBroadcast,
               projection,
               caseSensitive,
               locations != null ? locations[index] : 
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
@@ -168,7 +175,9 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   }
 
   private String[][] computePreferredLocations(List<CombinedScanTask> 
taskGroups) {
-    return localityPreferred ? 
SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups) : null;
+    return localityPreferred
+        ? SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups)
+        : null;
   }
 
   @Override
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index dff844eb45..fe5eeee8fb 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -79,7 +79,15 @@ abstract class SparkPartitioningAwareScan<T extends 
PartitionScanTask> extends S
       Schema projection,
       List<Expression> filters,
       Supplier<ScanReport> scanReportSupplier) {
-    super(spark, table, schema, readConf, projection, filters, 
scanReportSupplier);
+    super(
+        spark,
+        table,
+        null != scan ? scan.fileIO() : table::io,
+        schema,
+        readConf,
+        projection,
+        filters,
+        scanReportSupplier);
 
     this.scan = scan;
     this.preserveDataGrouping = readConf.preserveDataGrouping();
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 2e5f50ea88..6b80199a25 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -101,6 +102,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
 
   private final JavaSparkContext sparkContext;
   private final Table table;
+  private final Supplier<FileIO> fileIO;
   private final Schema schema;
   private final SparkSession spark;
   private final SparkReadConf readConf;
@@ -115,6 +117,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   SparkScan(
       SparkSession spark,
       Table table,
+      Supplier<FileIO> fileIO,
       Schema schema,
       SparkReadConf readConf,
       Schema projection,
@@ -123,6 +126,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
     this.spark = spark;
     this.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
+    this.fileIO = fileIO;
     this.schema = schema;
     this.readConf = readConf;
     this.caseSensitive = readConf.caseSensitive();
@@ -169,12 +173,20 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   @Override
   public Batch toBatch() {
     return new SparkBatch(
-        sparkContext, table, readConf, groupingKeyType(), taskGroups(), 
projection, hashCode());
+        sparkContext,
+        table,
+        fileIO,
+        readConf,
+        groupingKeyType(),
+        taskGroups(),
+        projection,
+        hashCode());
   }
 
   @Override
   public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
-    return new SparkMicroBatchStream(sparkContext, table, readConf, 
projection, checkpointLocation);
+    return new SparkMicroBatchStream(
+        sparkContext, table, fileIO, readConf, projection, checkpointLocation);
   }
 
   @Override
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index e4412f5cba..47481ec51c 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -47,7 +47,7 @@ class SparkStagedScan extends SparkScan {
       Schema projection,
       String taskSetId,
       SparkReadConf readConf) {
-    super(spark, table, schema, readConf, projection, ImmutableList.of(), 
null);
+    super(spark, table, table::io, schema, readConf, projection, 
ImmutableList.of(), null);
     this.taskSetId = taskSetId;
     this.splitSize = readConf.splitSize();
     this.splitLookback = readConf.splitLookback();
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
index 7df9c75fb3..1760143d2c 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
@@ -69,7 +69,11 @@ public abstract class TestBaseWithCatalog extends TestBase {
               // status even belonging to the same catalog. Reference:
               // https://www.sqlite.org/inmemorydb.html
               CatalogProperties.CLIENT_POOL_SIZE,
-              "1"));
+              "1",
+              "include-credentials",
+              "true",
+              "gcs.oauth2.token",
+              "dummyToken"));
 
   protected static RESTCatalog restCatalog;
 
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
index 5922723096..0eb9bbe52f 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
@@ -90,7 +90,7 @@ public class TestBaseReader {
     private final Map<String, CloseableIntegerRange> tracker = 
Maps.newHashMap();
 
     ClosureTrackingReader(Table table, List<FileScanTask> tasks) {
-      super(table, new BaseCombinedScanTask(tasks), null, false, true);
+      super(table, table.io(), new BaseCombinedScanTask(tasks), null, false, 
true);
     }
 
     @Override
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
index c31c10f97c..b12fdd443f 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
@@ -105,7 +105,7 @@ public class TestChangelogReader extends TestBase {
 
     for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
       ChangelogRowReader reader =
-          new ChangelogRowReader(table, taskGroup, table.schema(), false, 
true);
+          new ChangelogRowReader(table, table.io(), taskGroup, table.schema(), 
false, true);
       while (reader.next()) {
         rows.add(reader.get().copy());
       }
@@ -136,7 +136,7 @@ public class TestChangelogReader extends TestBase {
 
     for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
       ChangelogRowReader reader =
-          new ChangelogRowReader(table, taskGroup, table.schema(), false, 
true);
+          new ChangelogRowReader(table, table.io(), taskGroup, table.schema(), 
false, true);
       while (reader.next()) {
         rows.add(reader.get().copy());
       }
@@ -170,7 +170,7 @@ public class TestChangelogReader extends TestBase {
 
     for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
       ChangelogRowReader reader =
-          new ChangelogRowReader(table, taskGroup, table.schema(), false, 
true);
+          new ChangelogRowReader(table, table.io(), taskGroup, table.schema(), 
false, true);
       while (reader.next()) {
         rows.add(reader.get().copy());
       }
@@ -197,7 +197,7 @@ public class TestChangelogReader extends TestBase {
 
     for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
       ChangelogRowReader reader =
-          new ChangelogRowReader(table, taskGroup, table.schema(), false, 
true);
+          new ChangelogRowReader(table, table.io(), taskGroup, table.schema(), 
false, true);
       while (reader.next()) {
         rows.add(reader.get().copy());
       }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
index f4ace848af..681ab1fd76 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
@@ -180,6 +180,7 @@ public class TestPositionDeletesReader extends TestBase {
     try (PositionDeletesRowReader reader =
         new PositionDeletesRowReader(
             table,
+            table.io(),
             new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
             projectedSchema,
             false,
@@ -219,6 +220,7 @@ public class TestPositionDeletesReader extends TestBase {
     try (PositionDeletesRowReader reader =
         new PositionDeletesRowReader(
             table,
+            table.io(),
             new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)),
             projectedSchema,
             false,
@@ -290,6 +292,7 @@ public class TestPositionDeletesReader extends TestBase {
     try (PositionDeletesRowReader reader =
         new PositionDeletesRowReader(
             table,
+            table.io(),
             new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
             projectedSchema,
             false,
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index a58fc8bdb9..a5bf39a5a6 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -332,7 +332,7 @@ public class TestSparkReaderDeletes extends DeleteReadTests 
{
 
     for (CombinedScanTask task : tasks) {
       try (EqualityDeleteRowReader reader =
-          new EqualityDeleteRowReader(task, table, table.schema(), false, 
true)) {
+          new EqualityDeleteRowReader(task, table, table.io(), table.schema(), 
false, true)) {
         while (reader.next()) {
           actualRowSet.add(
               new InternalRowWrapper(
@@ -677,7 +677,14 @@ public class TestSparkReaderDeletes extends 
DeleteReadTests {
       try (BatchDataReader reader =
           new BatchDataReader(
               // expected column is id, while the equality filter column is dt
-              dateTable, task, dateTable.schema().select("id"), false, conf, 
null, true)) {
+              dateTable,
+              dateTable.io(),
+              task,
+              dateTable.schema().select("id"),
+              false,
+              conf,
+              null,
+              true)) {
         while (reader.next()) {
           ColumnarBatch columnarBatch = reader.get();
           int numOfCols = columnarBatch.numCols();


Reply via email to