This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 176c729623 Spark 4.1: Rename expectedSchema to projection for clarity 
(#15366)
176c729623 is described below

commit 176c729623fd370f777d78330f404bce1494a631
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 19 19:12:47 2026 -0800

    Spark 4.1: Rename expectedSchema to projection for clarity (#15366)
---
 .../iceberg/spark/source/BatchDataReader.java      |  2 +-
 .../iceberg/spark/source/ChangelogRowReader.java   |  2 +-
 .../spark/source/PositionDeletesRowReader.java     |  2 +-
 .../apache/iceberg/spark/source/RowDataReader.java |  2 +-
 .../apache/iceberg/spark/source/SparkBatch.java    | 14 ++++----
 .../iceberg/spark/source/SparkBatchQueryScan.java  | 10 +++---
 .../iceberg/spark/source/SparkChangelogScan.java   | 12 +++----
 .../iceberg/spark/source/SparkCopyOnWriteScan.java |  8 ++---
 .../iceberg/spark/source/SparkInputPartition.java  | 16 ++++-----
 .../spark/source/SparkMicroBatchStream.java        |  8 ++---
 .../spark/source/SparkPartitioningAwareScan.java   |  6 ++--
 .../org/apache/iceberg/spark/source/SparkScan.java | 19 +++++------
 .../iceberg/spark/source/SparkScanBuilder.java     | 38 +++++++++++-----------
 13 files changed, 69 insertions(+), 70 deletions(-)

diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 81aeb3a4ae..9a4ab30fec 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -53,7 +53,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
     this(
         partition.table(),
         partition.taskGroup(),
-        partition.expectedSchema(),
+        partition.projection(),
         partition.isCaseSensitive(),
         parquetBatchReadConf,
         orcBatchReadConf,
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 5054e28ac1..417440d4b4 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -51,7 +51,7 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
     this(
         partition.table(),
         partition.taskGroup(),
-        partition.expectedSchema(),
+        partition.projection(),
         partition.isCaseSensitive(),
         partition.cacheDeleteFilesOnExecutors());
   }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index c93e9e7983..1a45facba6 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -49,7 +49,7 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
     this(
         partition.table(),
         partition.taskGroup(),
-        partition.expectedSchema(),
+        partition.projection(),
         partition.isCaseSensitive(),
         partition.cacheDeleteFilesOnExecutors());
   }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index a4f3ecd8e3..0b53e72d99 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -48,7 +48,7 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
     this(
         partition.table(),
         partition.taskGroup(),
-        partition.expectedSchema(),
+        partition.projection(),
         partition.isCaseSensitive(),
         partition.cacheDeleteFilesOnExecutors());
   }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index d05d9a1657..a4d143fe93 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -50,7 +50,7 @@ class SparkBatch implements Batch {
   private final SparkReadConf readConf;
   private final Types.StructType groupingKeyType;
   private final List<? extends ScanTaskGroup<?>> taskGroups;
-  private final Schema expectedSchema;
+  private final Schema projection;
   private final boolean caseSensitive;
   private final boolean localityEnabled;
   private final boolean executorCacheLocalityEnabled;
@@ -63,14 +63,14 @@ class SparkBatch implements Batch {
       SparkReadConf readConf,
       Types.StructType groupingKeyType,
       List<? extends ScanTaskGroup<?>> taskGroups,
-      Schema expectedSchema,
+      Schema projection,
       int scanHashCode) {
     this.sparkContext = sparkContext;
     this.table = table;
     this.readConf = readConf;
     this.groupingKeyType = groupingKeyType;
     this.taskGroups = taskGroups;
-    this.expectedSchema = expectedSchema;
+    this.projection = projection;
     this.caseSensitive = readConf.caseSensitive();
     this.localityEnabled = readConf.localityEnabled();
     this.executorCacheLocalityEnabled = 
readConf.executorCacheLocalityEnabled();
@@ -83,7 +83,7 @@ class SparkBatch implements Batch {
     // broadcast the table metadata as input partitions will be sent to 
executors
     Broadcast<Table> tableBroadcast =
         sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
-    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+    String projectionString = SchemaParser.toJson(projection);
     String[][] locations = computePreferredLocations();
 
     InputPartition[] partitions = new InputPartition[taskGroups.size()];
@@ -94,7 +94,7 @@ class SparkBatch implements Batch {
               groupingKeyType,
               taskGroups.get(index),
               tableBroadcast,
-              expectedSchemaString,
+              projectionString,
               caseSensitive,
               locations != null ? locations[index] : 
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
               cacheDeleteFilesOnExecutors);
@@ -150,7 +150,7 @@ class SparkBatch implements Batch {
   // - all tasks are of FileScanTask type and read only Parquet files
   private boolean useParquetBatchReads() {
     return readConf.parquetVectorizationEnabled()
-        && 
expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads)
+        && 
projection.columns().stream().allMatch(this::supportsParquetBatchReads)
         && taskGroups.stream().allMatch(this::supportsParquetBatchReads);
   }
 
@@ -175,7 +175,7 @@ class SparkBatch implements Batch {
   private boolean useCometBatchReads() {
     return readConf.parquetVectorizationEnabled()
         && readConf.parquetReaderType() == ParquetReaderType.COMET
-        && 
expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads)
+        && 
projection.columns().stream().allMatch(this::supportsCometBatchReads)
         && taskGroups.stream().allMatch(this::supportsParquetBatchReads);
   }
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index 0db1df3aeb..119c04d076 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -79,10 +79,10 @@ class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
       Table table,
       Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
       SparkReadConf readConf,
-      Schema expectedSchema,
+      Schema projection,
       List<Expression> filters,
       Supplier<ScanReport> scanReportSupplier) {
-    super(spark, table, scan, readConf, expectedSchema, filters, 
scanReportSupplier);
+    super(spark, table, scan, readConf, projection, filters, 
scanReportSupplier);
 
     this.snapshotId = readConf.snapshotId();
     this.startSnapshotId = readConf.startSnapshotId();
@@ -111,14 +111,14 @@ class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
       }
     }
 
-    Map<Integer, String> quotedNameById = 
SparkSchemaUtil.indexQuotedNameById(expectedSchema());
+    Map<Integer, String> quotedNameById = 
SparkSchemaUtil.indexQuotedNameById(projection());
 
     // the optimizer will look for an equality condition with filter 
attributes in a join
     // as the scan has been already planned, filtering can only be done on 
projected attributes
     // that's why only partition source fields that are part of the read 
schema can be reported
 
     return partitionFieldSourceIds.stream()
-        .filter(fieldId -> expectedSchema().findField(fieldId) != null)
+        .filter(fieldId -> projection().findField(fieldId) != null)
         .map(fieldId -> 
Spark3Util.toNamedReference(quotedNameById.get(fieldId)))
         .toArray(NamedReference[]::new);
   }
@@ -199,7 +199,7 @@ class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
       Expression expr = SparkV2Filters.convert(predicate);
       if (expr != null) {
         try {
-          Binder.bind(expectedSchema().asStruct(), expr, caseSensitive());
+          Binder.bind(projection().asStruct(), expr, caseSensitive());
           runtimeFilterExpr = Expressions.and(runtimeFilterExpr, expr);
         } catch (ValidationException e) {
           LOG.warn("Failed to bind {} to expected schema, skipping runtime 
filter", expr, e);
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index c6c5edf45c..c9b33b60eb 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -52,7 +52,7 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
   private final Table table;
   private final IncrementalChangelogScan scan;
   private final SparkReadConf readConf;
-  private final Schema expectedSchema;
+  private final Schema projection;
   private final List<Expression> filters;
   private final Long startSnapshotId;
   private final Long endSnapshotId;
@@ -66,17 +66,17 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
       Table table,
       IncrementalChangelogScan scan,
       SparkReadConf readConf,
-      Schema expectedSchema,
+      Schema projection,
       List<Expression> filters,
       boolean emptyScan) {
 
-    SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), 
expectedSchema);
+    SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), 
projection);
 
     this.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
     this.scan = scan;
     this.readConf = readConf;
-    this.expectedSchema = expectedSchema;
+    this.projection = projection;
     this.filters = filters != null ? filters : Collections.emptyList();
     this.startSnapshotId = readConf.startSnapshotId();
     this.endSnapshotId = readConf.endSnapshotId();
@@ -95,7 +95,7 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
   @Override
   public StructType readSchema() {
     if (expectedSparkType == null) {
-      this.expectedSparkType = SparkSchemaUtil.convert(expectedSchema);
+      this.expectedSparkType = SparkSchemaUtil.convert(projection);
     }
 
     return expectedSparkType;
@@ -109,7 +109,7 @@ class SparkChangelogScan implements Scan, 
SupportsReportStatistics {
         readConf,
         EMPTY_GROUPING_KEY_TYPE,
         taskGroups(),
-        expectedSchema,
+        projection,
         hashCode());
   }
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index a0b531b3a5..0678236861 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -58,10 +58,10 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
       SparkSession spark,
       Table table,
       SparkReadConf readConf,
-      Schema expectedSchema,
+      Schema projection,
       List<Expression> filters,
       Supplier<ScanReport> scanReportSupplier) {
-    this(spark, table, null, null, readConf, expectedSchema, filters, 
scanReportSupplier);
+    this(spark, table, null, null, readConf, projection, filters, 
scanReportSupplier);
   }
 
   SparkCopyOnWriteScan(
@@ -70,10 +70,10 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
       BatchScan scan,
       Snapshot snapshot,
       SparkReadConf readConf,
-      Schema expectedSchema,
+      Schema projection,
       List<Expression> filters,
       Supplier<ScanReport> scanReportSupplier) {
-    super(spark, table, scan, readConf, expectedSchema, filters, 
scanReportSupplier);
+    super(spark, table, scan, readConf, projection, filters, 
scanReportSupplier);
 
     this.snapshot = snapshot;
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
index 8e3da5f57b..98a0061b3a 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -34,25 +34,25 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
   private final Types.StructType groupingKeyType;
   private final ScanTaskGroup<?> taskGroup;
   private final Broadcast<Table> tableBroadcast;
-  private final String expectedSchemaString;
+  private final String projectionString;
   private final boolean caseSensitive;
   private final transient String[] preferredLocations;
   private final boolean cacheDeleteFilesOnExecutors;
 
-  private transient Schema expectedSchema = null;
+  private transient Schema projection = null;
 
   SparkInputPartition(
       Types.StructType groupingKeyType,
       ScanTaskGroup<?> taskGroup,
       Broadcast<Table> tableBroadcast,
-      String expectedSchemaString,
+      String projectionString,
       boolean caseSensitive,
       String[] preferredLocations,
       boolean cacheDeleteFilesOnExecutors) {
     this.groupingKeyType = groupingKeyType;
     this.taskGroup = taskGroup;
     this.tableBroadcast = tableBroadcast;
-    this.expectedSchemaString = expectedSchemaString;
+    this.projectionString = projectionString;
     this.caseSensitive = caseSensitive;
     this.preferredLocations = preferredLocations;
     this.cacheDeleteFilesOnExecutors = cacheDeleteFilesOnExecutors;
@@ -89,11 +89,11 @@ class SparkInputPartition implements InputPartition, 
HasPartitionKey, Serializab
     return cacheDeleteFilesOnExecutors;
   }
 
-  public Schema expectedSchema() {
-    if (expectedSchema == null) {
-      this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
+  public Schema projection() {
+    if (projection == null) {
+      this.projection = SchemaParser.fromJson(projectionString);
     }
 
-    return expectedSchema;
+    return projection;
   }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 036a205c97..496a5b133f 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -61,7 +61,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   private final Table table;
   private final SparkReadConf readConf;
   private final boolean caseSensitive;
-  private final String expectedSchema;
+  private final String projection;
   private final Broadcast<Table> tableBroadcast;
   private final long splitSize;
   private final int splitLookback;
@@ -79,12 +79,12 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
       JavaSparkContext sparkContext,
       Table table,
       SparkReadConf readConf,
-      Schema expectedSchema,
+      Schema projection,
       String checkpointLocation) {
     this.table = table;
     this.readConf = readConf;
     this.caseSensitive = readConf.caseSensitive();
-    this.expectedSchema = SchemaParser.toJson(expectedSchema);
+    this.projection = SchemaParser.toJson(projection);
     this.localityPreferred = readConf.localityEnabled();
     this.tableBroadcast = 
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
     this.splitSize = readConf.splitSize();
@@ -155,7 +155,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
               EMPTY_GROUPING_KEY_TYPE,
               combinedScanTasks.get(index),
               tableBroadcast,
-              expectedSchema,
+              projection,
               caseSensitive,
               locations != null ? locations[index] : 
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
               cacheDeleteFilesOnExecutors);
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index a70176253b..72c7047c24 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -76,10 +76,10 @@ abstract class SparkPartitioningAwareScan<T extends 
PartitionScanTask> extends S
       Table table,
       Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
       SparkReadConf readConf,
-      Schema expectedSchema,
+      Schema projection,
       List<Expression> filters,
       Supplier<ScanReport> scanReportSupplier) {
-    super(spark, table, readConf, expectedSchema, filters, scanReportSupplier);
+    super(spark, table, readConf, projection, filters, scanReportSupplier);
 
     this.scan = scan;
     this.preserveDataGrouping = readConf.preserveDataGrouping();
@@ -129,7 +129,7 @@ abstract class SparkPartitioningAwareScan<T extends 
PartitionScanTask> extends S
   }
 
   private StructType computeGroupingKeyType() {
-    return org.apache.iceberg.Partitioning.groupingKeyType(expectedSchema(), 
specs());
+    return org.apache.iceberg.Partitioning.groupingKeyType(projection(), 
specs());
   }
 
   private Transform[] groupingKeyTransforms() {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 89c236e410..64cffe3791 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -105,7 +105,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   private final SparkSession spark;
   private final SparkReadConf readConf;
   private final boolean caseSensitive;
-  private final Schema expectedSchema;
+  private final Schema projection;
   private final List<Expression> filters;
   private final String branch;
   private final Supplier<ScanReport> scanReportSupplier;
@@ -117,18 +117,18 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
       SparkSession spark,
       Table table,
       SparkReadConf readConf,
-      Schema expectedSchema,
+      Schema projection,
       List<Expression> filters,
       Supplier<ScanReport> scanReportSupplier) {
     Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch());
-    SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, 
expectedSchema);
+    SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, 
projection);
 
     this.spark = spark;
     this.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
     this.readConf = readConf;
     this.caseSensitive = readConf.caseSensitive();
-    this.expectedSchema = expectedSchema;
+    this.projection = projection;
     this.filters = filters != null ? filters : Collections.emptyList();
     this.branch = readConf.branch();
     this.scanReportSupplier = scanReportSupplier;
@@ -146,8 +146,8 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
     return caseSensitive;
   }
 
-  protected Schema expectedSchema() {
-    return expectedSchema;
+  protected Schema projection() {
+    return projection;
   }
 
   protected List<Expression> filters() {
@@ -171,19 +171,18 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   @Override
   public Batch toBatch() {
     return new SparkBatch(
-        sparkContext, table, readConf, groupingKeyType(), taskGroups(), 
expectedSchema, hashCode());
+        sparkContext, table, readConf, groupingKeyType(), taskGroups(), 
projection, hashCode());
   }
 
   @Override
   public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
-    return new SparkMicroBatchStream(
-        sparkContext, table, readConf, expectedSchema, checkpointLocation);
+    return new SparkMicroBatchStream(sparkContext, table, readConf, 
projection, checkpointLocation);
   }
 
   @Override
   public StructType readSchema() {
     if (readSchema == null) {
-      this.readSchema = SparkSchemaUtil.convert(expectedSchema);
+      this.readSchema = SparkSchemaUtil.convert(projection);
     }
     return readSchema;
   }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index ab1fb55701..fb84b3e35c 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -226,18 +226,18 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
   }
 
   private Scan buildBatchScan() {
-    Schema expectedSchema = projectionWithMetadataColumns();
+    Schema projection = projectionWithMetadataColumns();
     return new SparkBatchQueryScan(
         spark(),
         table(),
-        buildIcebergBatchScan(false /* not include Column Stats */, 
expectedSchema),
+        buildIcebergBatchScan(false /* not include Column Stats */, 
projection),
         readConf(),
-        expectedSchema,
+        projection,
         filters(),
         metricsReporter()::scanReport);
   }
 
-  private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, 
Schema expectedSchema) {
+  private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, 
Schema projection) {
     Long snapshotId = readConf().snapshotId();
     Long asOfTimestamp = readConf().asOfTimestamp();
     String branch = readConf().branch();
@@ -278,9 +278,9 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
         SparkReadOptions.END_TIMESTAMP);
 
     if (startSnapshotId != null) {
-      return buildIncrementalAppendScan(startSnapshotId, endSnapshotId, 
withStats, expectedSchema);
+      return buildIncrementalAppendScan(startSnapshotId, endSnapshotId, 
withStats, projection);
     } else {
-      return buildBatchScan(snapshotId, asOfTimestamp, branch, tag, withStats, 
expectedSchema);
+      return buildBatchScan(snapshotId, asOfTimestamp, branch, tag, withStats, 
projection);
     }
   }
 
@@ -290,12 +290,12 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
       String branch,
       String tag,
       boolean withStats,
-      Schema expectedSchema) {
+      Schema projection) {
     BatchScan scan =
         newBatchScan()
             .caseSensitive(caseSensitive())
             .filter(filter())
-            .project(expectedSchema)
+            .project(projection)
             .metricsReporter(metricsReporter());
 
     if (withStats) {
@@ -322,14 +322,14 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
   }
 
   private org.apache.iceberg.Scan buildIncrementalAppendScan(
-      long startSnapshotId, Long endSnapshotId, boolean withStats, Schema 
expectedSchema) {
+      long startSnapshotId, Long endSnapshotId, boolean withStats, Schema 
projection) {
     IncrementalAppendScan scan =
         table()
             .newIncrementalAppendScan()
             .fromSnapshotExclusive(startSnapshotId)
             .caseSensitive(caseSensitive())
             .filter(filter())
-            .project(expectedSchema)
+            .project(projection)
             .metricsReporter(metricsReporter());
 
     if (withStats) {
@@ -398,14 +398,14 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
       }
     }
 
-    Schema expectedSchema = projectionWithMetadataColumns();
+    Schema projection = projectionWithMetadataColumns();
 
     IncrementalChangelogScan scan =
         table()
             .newIncrementalChangelogScan()
             .caseSensitive(caseSensitive())
             .filter(filter())
-            .project(expectedSchema)
+            .project(projection)
             .metricsReporter(metricsReporter());
 
     if (startSnapshotId != null) {
@@ -419,7 +419,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
     scan = configureSplitPlanning(scan);
 
     return new SparkChangelogScan(
-        spark(), table(), scan, readConf(), expectedSchema, filters(), 
emptyScan);
+        spark(), table(), scan, readConf(), projection, filters(), emptyScan);
   }
 
   private Long getStartSnapshotId(Long startTimestamp) {
@@ -482,14 +482,14 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
     SparkReadConf adjustedReadConf =
         new SparkReadConf(spark(), table(), readConf().branch(), 
adjustedOptions);
 
-    Schema expectedSchema = projectionWithMetadataColumns();
+    Schema projection = projectionWithMetadataColumns();
 
     BatchScan scan =
         newBatchScan()
             .useSnapshot(snapshotId)
             .caseSensitive(caseSensitive())
             .filter(filter())
-            .project(expectedSchema)
+            .project(projection)
             .metricsReporter(metricsReporter());
 
     scan = configureSplitPlanning(scan);
@@ -499,7 +499,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
         table(),
         scan,
         adjustedReadConf,
-        expectedSchema,
+        projection,
         filters(),
         metricsReporter()::scanReport);
   }
@@ -517,7 +517,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
           metricsReporter()::scanReport);
     }
 
-    Schema expectedSchema = projectionWithMetadataColumns();
+    Schema projection = projectionWithMetadataColumns();
 
     BatchScan scan =
         newBatchScan()
@@ -525,7 +525,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
             .ignoreResiduals()
             .caseSensitive(caseSensitive())
             .filter(filter())
-            .project(expectedSchema)
+            .project(projection)
             .metricsReporter(metricsReporter());
 
     scan = configureSplitPlanning(scan);
@@ -536,7 +536,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
         scan,
         snapshot,
         readConf(),
-        expectedSchema,
+        projection,
         filters(),
         metricsReporter()::scanReport);
   }

Reply via email to