This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 881417433a Spark 4.1: Use scan filter for conflict detection (#15365)
881417433a is described below

commit 881417433a881fbe832f338c31e4d6278625e018
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Feb 18 23:19:22 2026 -0800

    Spark 4.1: Use scan filter for conflict detection (#15365)
---
 .../org/apache/iceberg/spark/source/PlanUtils.java     |  2 +-
 .../iceberg/spark/source/SparkPositionDeltaWrite.java  | 13 +------------
 .../org/apache/iceberg/spark/source/SparkScan.java     | 17 +++++++++++------
 .../org/apache/iceberg/spark/source/SparkWrite.java    | 18 ++----------------
 4 files changed, 15 insertions(+), 35 deletions(-)

diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/PlanUtils.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/PlanUtils.java
index 148717e142..18fc63fc61 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/PlanUtils.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/PlanUtils.java
@@ -49,7 +49,7 @@ public class PlanUtils {
               }
 
               SparkBatchQueryScan batchQueryScan = (SparkBatchQueryScan) 
scanRelation.scan();
-              return batchQueryScan.filterExpressions().stream();
+              return batchQueryScan.filters().stream();
             })
         .collect(Collectors.toList());
   }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 6f091eb2a4..d3b27245b6 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -52,7 +52,6 @@ import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.encryption.EncryptingFileIO;
 import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.BasePositionDeltaWriter;
 import org.apache.iceberg.io.ClusteredDataWriter;
 import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
@@ -247,7 +246,7 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
       // the scan may be null if the optimizer replaces it with an empty 
relation
       // no validation is needed in this case as the command is independent of 
the table state
       if (scan != null) {
-        Expression conflictDetectionFilter = conflictDetectionFilter(scan);
+        Expression conflictDetectionFilter = scan.filter();
         rowDelta.conflictDetectionFilter(conflictDetectionFilter);
 
         rowDelta.validateDataFilesExist(referencedDataFiles);
@@ -291,16 +290,6 @@ class SparkPositionDeltaWrite extends BaseSparkWrite
       }
     }
 
-    private Expression conflictDetectionFilter(SparkBatchQueryScan queryScan) {
-      Expression filter = Expressions.alwaysTrue();
-
-      for (Expression expr : queryScan.filterExpressions()) {
-        filter = Expressions.and(filter, expr);
-      }
-
-      return filter;
-    }
-
     @Override
     public void abort(WriterCommitMessage[] messages) {
       if (cleanupOnAbort) {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 14fe80c43d..89c236e410 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -105,7 +106,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   private final SparkReadConf readConf;
   private final boolean caseSensitive;
   private final Schema expectedSchema;
-  private final List<Expression> filterExpressions;
+  private final List<Expression> filters;
   private final String branch;
   private final Supplier<ScanReport> scanReportSupplier;
 
@@ -128,7 +129,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
     this.readConf = readConf;
     this.caseSensitive = readConf.caseSensitive();
     this.expectedSchema = expectedSchema;
-    this.filterExpressions = filters != null ? filters : 
Collections.emptyList();
+    this.filters = filters != null ? filters : Collections.emptyList();
     this.branch = readConf.branch();
     this.scanReportSupplier = scanReportSupplier;
   }
@@ -149,12 +150,16 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
     return expectedSchema;
   }
 
-  protected List<Expression> filterExpressions() {
-    return filterExpressions;
+  protected List<Expression> filters() {
+    return filters;
+  }
+
+  protected Expression filter() {
+    return filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
   }
 
   protected String filtersDesc() {
-    return Spark3Util.describe(filterExpressions);
+    return Spark3Util.describe(filters);
   }
 
   protected Types.StructType groupingKeyType() {
@@ -240,7 +245,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
 
     // estimate stats using snapshot summary only for partitioned tables
     // (metadata tables are unpartitioned)
-    if (!table.spec().isUnpartitioned() && filterExpressions.isEmpty()) {
+    if (!table.spec().isUnpartitioned() && filters.isEmpty()) {
       LOG.debug(
           "Using snapshot {} metadata to estimate statistics for table {}",
           snapshot.snapshotId(),
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index e0a05ff11a..5a01e5d7bf 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -23,7 +23,6 @@ import static org.apache.iceberg.IsolationLevel.SNAPSHOT;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
@@ -428,19 +427,6 @@ abstract class SparkWrite extends BaseSparkWrite 
implements Write, RequiresDistr
       }
     }
 
-    private Expression conflictDetectionFilter() {
-      // the list of filter expressions may be empty but is never null
-      List<Expression> scanFilterExpressions = scan.filterExpressions();
-
-      Expression filter = Expressions.alwaysTrue();
-
-      for (Expression expr : scanFilterExpressions) {
-        filter = Expressions.and(filter, expr);
-      }
-
-      return filter;
-    }
-
     @Override
     public void commit(WriterCommitMessage[] messages) {
       commit(messages, null);
@@ -496,7 +482,7 @@ abstract class SparkWrite extends BaseSparkWrite implements 
Write, RequiresDistr
         overwriteFiles.validateFromSnapshot(scanSnapshotId);
       }
 
-      Expression conflictDetectionFilter = conflictDetectionFilter();
+      Expression conflictDetectionFilter = scan.filter();
       overwriteFiles.conflictDetectionFilter(conflictDetectionFilter);
       overwriteFiles.validateNoConflictingData();
       overwriteFiles.validateNoConflictingDeletes();
@@ -522,7 +508,7 @@ abstract class SparkWrite extends BaseSparkWrite implements 
Write, RequiresDistr
         overwriteFiles.validateFromSnapshot(scanSnapshotId);
       }
 
-      Expression conflictDetectionFilter = conflictDetectionFilter();
+      Expression conflictDetectionFilter = scan.filter();
       overwriteFiles.conflictDetectionFilter(conflictDetectionFilter);
       overwriteFiles.validateNoConflictingDeletes();
 

Reply via email to