This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 881417433a Spark 4.1: Use scan filter for conflict detection (#15365)
881417433a is described below
commit 881417433a881fbe832f338c31e4d6278625e018
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Feb 18 23:19:22 2026 -0800
Spark 4.1: Use scan filter for conflict detection (#15365)
---
.../org/apache/iceberg/spark/source/PlanUtils.java | 2 +-
.../iceberg/spark/source/SparkPositionDeltaWrite.java | 13 +------------
.../org/apache/iceberg/spark/source/SparkScan.java | 17 +++++++++++------
.../org/apache/iceberg/spark/source/SparkWrite.java | 18 ++----------------
4 files changed, 15 insertions(+), 35 deletions(-)
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/PlanUtils.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/PlanUtils.java
index 148717e142..18fc63fc61 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/PlanUtils.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/PlanUtils.java
@@ -49,7 +49,7 @@ public class PlanUtils {
}
SparkBatchQueryScan batchQueryScan = (SparkBatchQueryScan)
scanRelation.scan();
- return batchQueryScan.filterExpressions().stream();
+ return batchQueryScan.filters().stream();
})
.collect(Collectors.toList());
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 6f091eb2a4..d3b27245b6 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -52,7 +52,6 @@ import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BasePositionDeltaWriter;
import org.apache.iceberg.io.ClusteredDataWriter;
import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
@@ -247,7 +246,7 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
// the scan may be null if the optimizer replaces it with an empty
relation
// no validation is needed in this case as the command is independent of
the table state
if (scan != null) {
- Expression conflictDetectionFilter = conflictDetectionFilter(scan);
+ Expression conflictDetectionFilter = scan.filter();
rowDelta.conflictDetectionFilter(conflictDetectionFilter);
rowDelta.validateDataFilesExist(referencedDataFiles);
@@ -291,16 +290,6 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
}
}
- private Expression conflictDetectionFilter(SparkBatchQueryScan queryScan) {
- Expression filter = Expressions.alwaysTrue();
-
- for (Expression expr : queryScan.filterExpressions()) {
- filter = Expressions.and(filter, expr);
- }
-
- return filter;
- }
-
@Override
public void abort(WriterCommitMessage[] messages) {
if (cleanupOnAbort) {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 14fe80c43d..89c236e410 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -105,7 +106,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
private final SparkReadConf readConf;
private final boolean caseSensitive;
private final Schema expectedSchema;
- private final List<Expression> filterExpressions;
+ private final List<Expression> filters;
private final String branch;
private final Supplier<ScanReport> scanReportSupplier;
@@ -128,7 +129,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
this.readConf = readConf;
this.caseSensitive = readConf.caseSensitive();
this.expectedSchema = expectedSchema;
- this.filterExpressions = filters != null ? filters :
Collections.emptyList();
+ this.filters = filters != null ? filters : Collections.emptyList();
this.branch = readConf.branch();
this.scanReportSupplier = scanReportSupplier;
}
@@ -149,12 +150,16 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
return expectedSchema;
}
- protected List<Expression> filterExpressions() {
- return filterExpressions;
+ protected List<Expression> filters() {
+ return filters;
+ }
+
+ protected Expression filter() {
+ return filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
}
protected String filtersDesc() {
- return Spark3Util.describe(filterExpressions);
+ return Spark3Util.describe(filters);
}
protected Types.StructType groupingKeyType() {
@@ -240,7 +245,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
// estimate stats using snapshot summary only for partitioned tables
// (metadata tables are unpartitioned)
- if (!table.spec().isUnpartitioned() && filterExpressions.isEmpty()) {
+ if (!table.spec().isUnpartitioned() && filters.isEmpty()) {
LOG.debug(
"Using snapshot {} metadata to estimate statistics for table {}",
snapshot.snapshotId(),
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index e0a05ff11a..5a01e5d7bf 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -23,7 +23,6 @@ import static org.apache.iceberg.IsolationLevel.SNAPSHOT;
import java.io.IOException;
import java.util.Arrays;
-import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
@@ -428,19 +427,6 @@ abstract class SparkWrite extends BaseSparkWrite
implements Write, RequiresDistr
}
}
- private Expression conflictDetectionFilter() {
- // the list of filter expressions may be empty but is never null
- List<Expression> scanFilterExpressions = scan.filterExpressions();
-
- Expression filter = Expressions.alwaysTrue();
-
- for (Expression expr : scanFilterExpressions) {
- filter = Expressions.and(filter, expr);
- }
-
- return filter;
- }
-
@Override
public void commit(WriterCommitMessage[] messages) {
commit(messages, null);
@@ -496,7 +482,7 @@ abstract class SparkWrite extends BaseSparkWrite implements
Write, RequiresDistr
overwriteFiles.validateFromSnapshot(scanSnapshotId);
}
- Expression conflictDetectionFilter = conflictDetectionFilter();
+ Expression conflictDetectionFilter = scan.filter();
overwriteFiles.conflictDetectionFilter(conflictDetectionFilter);
overwriteFiles.validateNoConflictingData();
overwriteFiles.validateNoConflictingDeletes();
@@ -522,7 +508,7 @@ abstract class SparkWrite extends BaseSparkWrite implements
Write, RequiresDistr
overwriteFiles.validateFromSnapshot(scanSnapshotId);
}
- Expression conflictDetectionFilter = conflictDetectionFilter();
+ Expression conflictDetectionFilter = scan.filter();
overwriteFiles.conflictDetectionFilter(conflictDetectionFilter);
overwriteFiles.validateNoConflictingDeletes();