This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 176c729623 Spark 4.1: Rename expectedSchema to projection for clarity
(#15366)
176c729623 is described below
commit 176c729623fd370f777d78330f404bce1494a631
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 19 19:12:47 2026 -0800
Spark 4.1: Rename expectedSchema to projection for clarity (#15366)
---
.../iceberg/spark/source/BatchDataReader.java | 2 +-
.../iceberg/spark/source/ChangelogRowReader.java | 2 +-
.../spark/source/PositionDeletesRowReader.java | 2 +-
.../apache/iceberg/spark/source/RowDataReader.java | 2 +-
.../apache/iceberg/spark/source/SparkBatch.java | 14 ++++----
.../iceberg/spark/source/SparkBatchQueryScan.java | 10 +++---
.../iceberg/spark/source/SparkChangelogScan.java | 12 +++----
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 8 ++---
.../iceberg/spark/source/SparkInputPartition.java | 16 ++++-----
.../spark/source/SparkMicroBatchStream.java | 8 ++---
.../spark/source/SparkPartitioningAwareScan.java | 6 ++--
.../org/apache/iceberg/spark/source/SparkScan.java | 19 +++++------
.../iceberg/spark/source/SparkScanBuilder.java | 38 +++++++++++-----------
13 files changed, 69 insertions(+), 70 deletions(-)
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 81aeb3a4ae..9a4ab30fec 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -53,7 +53,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
this(
partition.table(),
partition.taskGroup(),
- partition.expectedSchema(),
+ partition.projection(),
partition.isCaseSensitive(),
parquetBatchReadConf,
orcBatchReadConf,
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 5054e28ac1..417440d4b4 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -51,7 +51,7 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
this(
partition.table(),
partition.taskGroup(),
- partition.expectedSchema(),
+ partition.projection(),
partition.isCaseSensitive(),
partition.cacheDeleteFilesOnExecutors());
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index c93e9e7983..1a45facba6 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -49,7 +49,7 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
this(
partition.table(),
partition.taskGroup(),
- partition.expectedSchema(),
+ partition.projection(),
partition.isCaseSensitive(),
partition.cacheDeleteFilesOnExecutors());
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index a4f3ecd8e3..0b53e72d99 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -48,7 +48,7 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
this(
partition.table(),
partition.taskGroup(),
- partition.expectedSchema(),
+ partition.projection(),
partition.isCaseSensitive(),
partition.cacheDeleteFilesOnExecutors());
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index d05d9a1657..a4d143fe93 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -50,7 +50,7 @@ class SparkBatch implements Batch {
private final SparkReadConf readConf;
private final Types.StructType groupingKeyType;
private final List<? extends ScanTaskGroup<?>> taskGroups;
- private final Schema expectedSchema;
+ private final Schema projection;
private final boolean caseSensitive;
private final boolean localityEnabled;
private final boolean executorCacheLocalityEnabled;
@@ -63,14 +63,14 @@ class SparkBatch implements Batch {
SparkReadConf readConf,
Types.StructType groupingKeyType,
List<? extends ScanTaskGroup<?>> taskGroups,
- Schema expectedSchema,
+ Schema projection,
int scanHashCode) {
this.sparkContext = sparkContext;
this.table = table;
this.readConf = readConf;
this.groupingKeyType = groupingKeyType;
this.taskGroups = taskGroups;
- this.expectedSchema = expectedSchema;
+ this.projection = projection;
this.caseSensitive = readConf.caseSensitive();
this.localityEnabled = readConf.localityEnabled();
this.executorCacheLocalityEnabled =
readConf.executorCacheLocalityEnabled();
@@ -83,7 +83,7 @@ class SparkBatch implements Batch {
// broadcast the table metadata as input partitions will be sent to
executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
- String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+ String projectionString = SchemaParser.toJson(projection);
String[][] locations = computePreferredLocations();
InputPartition[] partitions = new InputPartition[taskGroups.size()];
@@ -94,7 +94,7 @@ class SparkBatch implements Batch {
groupingKeyType,
taskGroups.get(index),
tableBroadcast,
- expectedSchemaString,
+ projectionString,
caseSensitive,
locations != null ? locations[index] :
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
cacheDeleteFilesOnExecutors);
@@ -150,7 +150,7 @@ class SparkBatch implements Batch {
// - all tasks are of FileScanTask type and read only Parquet files
private boolean useParquetBatchReads() {
return readConf.parquetVectorizationEnabled()
- &&
expectedSchema.columns().stream().allMatch(this::supportsParquetBatchReads)
+ &&
projection.columns().stream().allMatch(this::supportsParquetBatchReads)
&& taskGroups.stream().allMatch(this::supportsParquetBatchReads);
}
@@ -175,7 +175,7 @@ class SparkBatch implements Batch {
private boolean useCometBatchReads() {
return readConf.parquetVectorizationEnabled()
&& readConf.parquetReaderType() == ParquetReaderType.COMET
- &&
expectedSchema.columns().stream().allMatch(this::supportsCometBatchReads)
+ &&
projection.columns().stream().allMatch(this::supportsCometBatchReads)
&& taskGroups.stream().allMatch(this::supportsParquetBatchReads);
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index 0db1df3aeb..119c04d076 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -79,10 +79,10 @@ class SparkBatchQueryScan extends
SparkPartitioningAwareScan<PartitionScanTask>
Table table,
Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
SparkReadConf readConf,
- Schema expectedSchema,
+ Schema projection,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {
- super(spark, table, scan, readConf, expectedSchema, filters,
scanReportSupplier);
+ super(spark, table, scan, readConf, projection, filters,
scanReportSupplier);
this.snapshotId = readConf.snapshotId();
this.startSnapshotId = readConf.startSnapshotId();
@@ -111,14 +111,14 @@ class SparkBatchQueryScan extends
SparkPartitioningAwareScan<PartitionScanTask>
}
}
- Map<Integer, String> quotedNameById =
SparkSchemaUtil.indexQuotedNameById(expectedSchema());
+ Map<Integer, String> quotedNameById =
SparkSchemaUtil.indexQuotedNameById(projection());
// the optimizer will look for an equality condition with filter
attributes in a join
// as the scan has been already planned, filtering can only be done on
projected attributes
// that's why only partition source fields that are part of the read
schema can be reported
return partitionFieldSourceIds.stream()
- .filter(fieldId -> expectedSchema().findField(fieldId) != null)
+ .filter(fieldId -> projection().findField(fieldId) != null)
.map(fieldId ->
Spark3Util.toNamedReference(quotedNameById.get(fieldId)))
.toArray(NamedReference[]::new);
}
@@ -199,7 +199,7 @@ class SparkBatchQueryScan extends
SparkPartitioningAwareScan<PartitionScanTask>
Expression expr = SparkV2Filters.convert(predicate);
if (expr != null) {
try {
- Binder.bind(expectedSchema().asStruct(), expr, caseSensitive());
+ Binder.bind(projection().asStruct(), expr, caseSensitive());
runtimeFilterExpr = Expressions.and(runtimeFilterExpr, expr);
} catch (ValidationException e) {
LOG.warn("Failed to bind {} to expected schema, skipping runtime
filter", expr, e);
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index c6c5edf45c..c9b33b60eb 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -52,7 +52,7 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
private final Table table;
private final IncrementalChangelogScan scan;
private final SparkReadConf readConf;
- private final Schema expectedSchema;
+ private final Schema projection;
private final List<Expression> filters;
private final Long startSnapshotId;
private final Long endSnapshotId;
@@ -66,17 +66,17 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
Table table,
IncrementalChangelogScan scan,
SparkReadConf readConf,
- Schema expectedSchema,
+ Schema projection,
List<Expression> filters,
boolean emptyScan) {
- SparkSchemaUtil.validateMetadataColumnReferences(table.schema(),
expectedSchema);
+ SparkSchemaUtil.validateMetadataColumnReferences(table.schema(),
projection);
this.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.scan = scan;
this.readConf = readConf;
- this.expectedSchema = expectedSchema;
+ this.projection = projection;
this.filters = filters != null ? filters : Collections.emptyList();
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
@@ -95,7 +95,7 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
@Override
public StructType readSchema() {
if (expectedSparkType == null) {
- this.expectedSparkType = SparkSchemaUtil.convert(expectedSchema);
+ this.expectedSparkType = SparkSchemaUtil.convert(projection);
}
return expectedSparkType;
@@ -109,7 +109,7 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
readConf,
EMPTY_GROUPING_KEY_TYPE,
taskGroups(),
- expectedSchema,
+ projection,
hashCode());
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index a0b531b3a5..0678236861 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -58,10 +58,10 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
SparkSession spark,
Table table,
SparkReadConf readConf,
- Schema expectedSchema,
+ Schema projection,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {
- this(spark, table, null, null, readConf, expectedSchema, filters,
scanReportSupplier);
+ this(spark, table, null, null, readConf, projection, filters,
scanReportSupplier);
}
SparkCopyOnWriteScan(
@@ -70,10 +70,10 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
BatchScan scan,
Snapshot snapshot,
SparkReadConf readConf,
- Schema expectedSchema,
+ Schema projection,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {
- super(spark, table, scan, readConf, expectedSchema, filters,
scanReportSupplier);
+ super(spark, table, scan, readConf, projection, filters,
scanReportSupplier);
this.snapshot = snapshot;
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
index 8e3da5f57b..98a0061b3a 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -34,25 +34,25 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
private final Types.StructType groupingKeyType;
private final ScanTaskGroup<?> taskGroup;
private final Broadcast<Table> tableBroadcast;
- private final String expectedSchemaString;
+ private final String projectionString;
private final boolean caseSensitive;
private final transient String[] preferredLocations;
private final boolean cacheDeleteFilesOnExecutors;
- private transient Schema expectedSchema = null;
+ private transient Schema projection = null;
SparkInputPartition(
Types.StructType groupingKeyType,
ScanTaskGroup<?> taskGroup,
Broadcast<Table> tableBroadcast,
- String expectedSchemaString,
+ String projectionString,
boolean caseSensitive,
String[] preferredLocations,
boolean cacheDeleteFilesOnExecutors) {
this.groupingKeyType = groupingKeyType;
this.taskGroup = taskGroup;
this.tableBroadcast = tableBroadcast;
- this.expectedSchemaString = expectedSchemaString;
+ this.projectionString = projectionString;
this.caseSensitive = caseSensitive;
this.preferredLocations = preferredLocations;
this.cacheDeleteFilesOnExecutors = cacheDeleteFilesOnExecutors;
@@ -89,11 +89,11 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
return cacheDeleteFilesOnExecutors;
}
- public Schema expectedSchema() {
- if (expectedSchema == null) {
- this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
+ public Schema projection() {
+ if (projection == null) {
+ this.projection = SchemaParser.fromJson(projectionString);
}
- return expectedSchema;
+ return projection;
}
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 036a205c97..496a5b133f 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -61,7 +61,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
private final Table table;
private final SparkReadConf readConf;
private final boolean caseSensitive;
- private final String expectedSchema;
+ private final String projection;
private final Broadcast<Table> tableBroadcast;
private final long splitSize;
private final int splitLookback;
@@ -79,12 +79,12 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
JavaSparkContext sparkContext,
Table table,
SparkReadConf readConf,
- Schema expectedSchema,
+ Schema projection,
String checkpointLocation) {
this.table = table;
this.readConf = readConf;
this.caseSensitive = readConf.caseSensitive();
- this.expectedSchema = SchemaParser.toJson(expectedSchema);
+ this.projection = SchemaParser.toJson(projection);
this.localityPreferred = readConf.localityEnabled();
this.tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
this.splitSize = readConf.splitSize();
@@ -155,7 +155,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
EMPTY_GROUPING_KEY_TYPE,
combinedScanTasks.get(index),
tableBroadcast,
- expectedSchema,
+ projection,
caseSensitive,
locations != null ? locations[index] :
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
cacheDeleteFilesOnExecutors);
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index a70176253b..72c7047c24 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -76,10 +76,10 @@ abstract class SparkPartitioningAwareScan<T extends
PartitionScanTask> extends S
Table table,
Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
SparkReadConf readConf,
- Schema expectedSchema,
+ Schema projection,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {
- super(spark, table, readConf, expectedSchema, filters, scanReportSupplier);
+ super(spark, table, readConf, projection, filters, scanReportSupplier);
this.scan = scan;
this.preserveDataGrouping = readConf.preserveDataGrouping();
@@ -129,7 +129,7 @@ abstract class SparkPartitioningAwareScan<T extends
PartitionScanTask> extends S
}
private StructType computeGroupingKeyType() {
- return org.apache.iceberg.Partitioning.groupingKeyType(expectedSchema(),
specs());
+ return org.apache.iceberg.Partitioning.groupingKeyType(projection(),
specs());
}
private Transform[] groupingKeyTransforms() {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 89c236e410..64cffe3791 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -105,7 +105,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
private final SparkSession spark;
private final SparkReadConf readConf;
private final boolean caseSensitive;
- private final Schema expectedSchema;
+ private final Schema projection;
private final List<Expression> filters;
private final String branch;
private final Supplier<ScanReport> scanReportSupplier;
@@ -117,18 +117,18 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
SparkSession spark,
Table table,
SparkReadConf readConf,
- Schema expectedSchema,
+ Schema projection,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {
Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch());
- SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema,
expectedSchema);
+ SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema,
projection);
this.spark = spark;
this.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = readConf;
this.caseSensitive = readConf.caseSensitive();
- this.expectedSchema = expectedSchema;
+ this.projection = projection;
this.filters = filters != null ? filters : Collections.emptyList();
this.branch = readConf.branch();
this.scanReportSupplier = scanReportSupplier;
@@ -146,8 +146,8 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
return caseSensitive;
}
- protected Schema expectedSchema() {
- return expectedSchema;
+ protected Schema projection() {
+ return projection;
}
protected List<Expression> filters() {
@@ -171,19 +171,18 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
@Override
public Batch toBatch() {
return new SparkBatch(
- sparkContext, table, readConf, groupingKeyType(), taskGroups(),
expectedSchema, hashCode());
+ sparkContext, table, readConf, groupingKeyType(), taskGroups(),
projection, hashCode());
}
@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
- return new SparkMicroBatchStream(
- sparkContext, table, readConf, expectedSchema, checkpointLocation);
+ return new SparkMicroBatchStream(sparkContext, table, readConf,
projection, checkpointLocation);
}
@Override
public StructType readSchema() {
if (readSchema == null) {
- this.readSchema = SparkSchemaUtil.convert(expectedSchema);
+ this.readSchema = SparkSchemaUtil.convert(projection);
}
return readSchema;
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index ab1fb55701..fb84b3e35c 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -226,18 +226,18 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
}
private Scan buildBatchScan() {
- Schema expectedSchema = projectionWithMetadataColumns();
+ Schema projection = projectionWithMetadataColumns();
return new SparkBatchQueryScan(
spark(),
table(),
- buildIcebergBatchScan(false /* not include Column Stats */,
expectedSchema),
+ buildIcebergBatchScan(false /* not include Column Stats */,
projection),
readConf(),
- expectedSchema,
+ projection,
filters(),
metricsReporter()::scanReport);
}
- private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats,
Schema expectedSchema) {
+ private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats,
Schema projection) {
Long snapshotId = readConf().snapshotId();
Long asOfTimestamp = readConf().asOfTimestamp();
String branch = readConf().branch();
@@ -278,9 +278,9 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
SparkReadOptions.END_TIMESTAMP);
if (startSnapshotId != null) {
- return buildIncrementalAppendScan(startSnapshotId, endSnapshotId,
withStats, expectedSchema);
+ return buildIncrementalAppendScan(startSnapshotId, endSnapshotId,
withStats, projection);
} else {
- return buildBatchScan(snapshotId, asOfTimestamp, branch, tag, withStats,
expectedSchema);
+ return buildBatchScan(snapshotId, asOfTimestamp, branch, tag, withStats,
projection);
}
}
@@ -290,12 +290,12 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
String branch,
String tag,
boolean withStats,
- Schema expectedSchema) {
+ Schema projection) {
BatchScan scan =
newBatchScan()
.caseSensitive(caseSensitive())
.filter(filter())
- .project(expectedSchema)
+ .project(projection)
.metricsReporter(metricsReporter());
if (withStats) {
@@ -322,14 +322,14 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
}
private org.apache.iceberg.Scan buildIncrementalAppendScan(
- long startSnapshotId, Long endSnapshotId, boolean withStats, Schema
expectedSchema) {
+ long startSnapshotId, Long endSnapshotId, boolean withStats, Schema
projection) {
IncrementalAppendScan scan =
table()
.newIncrementalAppendScan()
.fromSnapshotExclusive(startSnapshotId)
.caseSensitive(caseSensitive())
.filter(filter())
- .project(expectedSchema)
+ .project(projection)
.metricsReporter(metricsReporter());
if (withStats) {
@@ -398,14 +398,14 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
}
}
- Schema expectedSchema = projectionWithMetadataColumns();
+ Schema projection = projectionWithMetadataColumns();
IncrementalChangelogScan scan =
table()
.newIncrementalChangelogScan()
.caseSensitive(caseSensitive())
.filter(filter())
- .project(expectedSchema)
+ .project(projection)
.metricsReporter(metricsReporter());
if (startSnapshotId != null) {
@@ -419,7 +419,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
scan = configureSplitPlanning(scan);
return new SparkChangelogScan(
- spark(), table(), scan, readConf(), expectedSchema, filters(),
emptyScan);
+ spark(), table(), scan, readConf(), projection, filters(), emptyScan);
}
private Long getStartSnapshotId(Long startTimestamp) {
@@ -482,14 +482,14 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
SparkReadConf adjustedReadConf =
new SparkReadConf(spark(), table(), readConf().branch(),
adjustedOptions);
- Schema expectedSchema = projectionWithMetadataColumns();
+ Schema projection = projectionWithMetadataColumns();
BatchScan scan =
newBatchScan()
.useSnapshot(snapshotId)
.caseSensitive(caseSensitive())
.filter(filter())
- .project(expectedSchema)
+ .project(projection)
.metricsReporter(metricsReporter());
scan = configureSplitPlanning(scan);
@@ -499,7 +499,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
table(),
scan,
adjustedReadConf,
- expectedSchema,
+ projection,
filters(),
metricsReporter()::scanReport);
}
@@ -517,7 +517,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
metricsReporter()::scanReport);
}
- Schema expectedSchema = projectionWithMetadataColumns();
+ Schema projection = projectionWithMetadataColumns();
BatchScan scan =
newBatchScan()
@@ -525,7 +525,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
.ignoreResiduals()
.caseSensitive(caseSensitive())
.filter(filter())
- .project(expectedSchema)
+ .project(projection)
.metricsReporter(metricsReporter());
scan = configureSplitPlanning(scan);
@@ -536,7 +536,7 @@ public class SparkScanBuilder extends BaseSparkScanBuilder
scan,
snapshot,
readConf(),
- expectedSchema,
+ projection,
filters(),
metricsReporter()::scanReport);
}