This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bc8f264be1 Spark: Backport migrate SparkCopyOnWriteScan to 
SupportsRuntimeV2Filtering (#16303)
bc8f264be1 is described below

commit bc8f264be1a77f5979616d582fa121c681b87ca7
Author: drexler-sky <[email protected]>
AuthorDate: Wed May 13 23:30:33 2026 -0700

    Spark: Backport migrate SparkCopyOnWriteScan to SupportsRuntimeV2Filtering 
(#16303)
    
    * Spark: Backport migrate SparkCopyOnWriteScan to SupportsRuntimeV2Filtering
    
    * trigger build
    
    * Update 
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
    
    * Update 
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
    
    * Update 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
    
    * add back import
    
    * spark 3.4: SupportsRuntimeFiltering -> SupportsRuntimeV2Filtering
    
    ---------
    
    Co-authored-by: Kevin Liu <[email protected]>
    Co-authored-by: Kevin Liu <[email protected]>
---
 .../RowLevelCommandDynamicPruning.scala            |  4 +-
 .../iceberg/spark/source/SparkCopyOnWriteScan.java | 54 +++++++++++++++-------
 .../iceberg/spark/source/SparkCopyOnWriteScan.java | 54 +++++++++++++++-------
 .../iceberg/spark/source/SparkCopyOnWriteScan.java | 54 +++++++++++++++-------
 4 files changed, 116 insertions(+), 50 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
 
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
index f8acef9fe3..6766ad338b 100644
--- 
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
+++ 
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
@@ -45,7 +45,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.UpdateIcebergTable
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
 import org.apache.spark.sql.catalyst.trees.TreePattern.SORT
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import 
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits
@@ -67,7 +67,7 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession)
     // apply special dynamic filtering only for plans that don't support deltas
     case RewrittenRowLevelCommand(
           command: RowLevelCommand,
-          DataSourceV2ScanRelation(_, scan: SupportsRuntimeFiltering, _, _, _),
+          DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, 
_),
           rewritePlan: ReplaceIcebergData)
         if conf.dynamicPartitionPruningEnabled && isCandidate(command) =>
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index dbf5d455b9..9674a7333f 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -39,11 +39,11 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
 import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
 import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StructField;
@@ -52,7 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
-    implements SupportsRuntimeFiltering {
+    implements SupportsRuntimeV2Filtering {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkCopyOnWriteScan.class);
 
@@ -118,7 +118,7 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
   }
 
   @Override
-  public void filter(Filter[] filters) {
+  public void filter(Predicate[] predicates) {
     Preconditions.checkState(
         Objects.equals(snapshotId(), currentSnapshotId()),
         "Runtime file filtering is not possible: the table has been 
concurrently modified. "
@@ -128,16 +128,10 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
         snapshotId(),
         currentSnapshotId());
 
-    for (Filter filter : filters) {
-      // Spark can only pass In filters at the moment
-      if (filter instanceof In
-          && ((In) 
filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
-        In in = (In) filter;
-
-        Set<String> fileLocations = Sets.newHashSet();
-        for (Object value : in.values()) {
-          fileLocations.add((String) value);
-        }
+    for (Predicate predicate : predicates) {
+      // Spark can only pass IN predicates at the moment
+      if (isFilePathInPredicate(predicate)) {
+        Set<String> fileLocations = extractStringLiterals(predicate);
 
         // Spark may call this multiple times for UPDATEs with subqueries
         // as such cases are rewritten using UNION and the same scan on both 
sides
@@ -159,7 +153,7 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
           resetTasks(filteredTasks);
         }
       } else {
-        LOG.warn("Unsupported runtime filter {}", filter);
+        LOG.warn("Unsupported runtime filter {}", predicate);
       }
     }
   }
@@ -228,4 +222,32 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
             || 
field.name().equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
     return hasLineageFieldName && field.metadata().contains("__metadata_col");
   }
+
+  private static boolean isFilePathInPredicate(Predicate predicate) {
+    if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
+      return false;
+    }
+
+    if (!(predicate.children()[0] instanceof NamedReference)) {
+      return false;
+    }
+
+    String[] fieldNames = ((NamedReference) 
predicate.children()[0]).fieldNames();
+
+    return fieldNames.length == 1
+        && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
+  }
+
+  private static Set<String> extractStringLiterals(Predicate predicate) {
+    Set<String> values = Sets.newHashSet();
+    for (int i = 1; i < predicate.children().length; i++) {
+      if (predicate.children()[i] instanceof Literal) {
+        Object value = ((Literal<?>) predicate.children()[i]).value();
+        // V2 string literals come through as UTF8String; toString() 
materializes the Java String
+        values.add(value.toString());
+      }
+    }
+
+    return values;
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index dbf5d455b9..9674a7333f 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -39,11 +39,11 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
 import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
 import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StructField;
@@ -52,7 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
-    implements SupportsRuntimeFiltering {
+    implements SupportsRuntimeV2Filtering {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkCopyOnWriteScan.class);
 
@@ -118,7 +118,7 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
   }
 
   @Override
-  public void filter(Filter[] filters) {
+  public void filter(Predicate[] predicates) {
     Preconditions.checkState(
         Objects.equals(snapshotId(), currentSnapshotId()),
         "Runtime file filtering is not possible: the table has been 
concurrently modified. "
@@ -128,16 +128,10 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
         snapshotId(),
         currentSnapshotId());
 
-    for (Filter filter : filters) {
-      // Spark can only pass In filters at the moment
-      if (filter instanceof In
-          && ((In) 
filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
-        In in = (In) filter;
-
-        Set<String> fileLocations = Sets.newHashSet();
-        for (Object value : in.values()) {
-          fileLocations.add((String) value);
-        }
+    for (Predicate predicate : predicates) {
+      // Spark can only pass IN predicates at the moment
+      if (isFilePathInPredicate(predicate)) {
+        Set<String> fileLocations = extractStringLiterals(predicate);
 
         // Spark may call this multiple times for UPDATEs with subqueries
         // as such cases are rewritten using UNION and the same scan on both 
sides
@@ -159,7 +153,7 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
           resetTasks(filteredTasks);
         }
       } else {
-        LOG.warn("Unsupported runtime filter {}", filter);
+        LOG.warn("Unsupported runtime filter {}", predicate);
       }
     }
   }
@@ -228,4 +222,32 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
             || 
field.name().equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
     return hasLineageFieldName && field.metadata().contains("__metadata_col");
   }
+
+  private static boolean isFilePathInPredicate(Predicate predicate) {
+    if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
+      return false;
+    }
+
+    if (!(predicate.children()[0] instanceof NamedReference)) {
+      return false;
+    }
+
+    String[] fieldNames = ((NamedReference) 
predicate.children()[0]).fieldNames();
+
+    return fieldNames.length == 1
+        && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
+  }
+
+  private static Set<String> extractStringLiterals(Predicate predicate) {
+    Set<String> values = Sets.newHashSet();
+    for (int i = 1; i < predicate.children().length; i++) {
+      if (predicate.children()[i] instanceof Literal) {
+        Object value = ((Literal<?>) predicate.children()[i]).value();
+        // V2 string literals come through as UTF8String; toString() 
materializes the Java String
+        values.add(value.toString());
+      }
+    }
+
+    return values;
+  }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index ee4be24618..f957b97d60 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -38,16 +38,16 @@ import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
 import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
 import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
-    implements SupportsRuntimeFiltering {
+    implements SupportsRuntimeV2Filtering {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkCopyOnWriteScan.class);
 
@@ -103,7 +103,7 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
   }
 
   @Override
-  public void filter(Filter[] filters) {
+  public void filter(Predicate[] predicates) {
     Preconditions.checkState(
         Objects.equals(snapshotId(), currentSnapshotId()),
         "Runtime file filtering is not possible: the table has been 
concurrently modified. "
@@ -113,16 +113,10 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
         snapshotId(),
         currentSnapshotId());
 
-    for (Filter filter : filters) {
-      // Spark can only pass In filters at the moment
-      if (filter instanceof In
-          && ((In) 
filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
-        In in = (In) filter;
-
-        Set<String> fileLocations = Sets.newHashSet();
-        for (Object value : in.values()) {
-          fileLocations.add((String) value);
-        }
+    for (Predicate predicate : predicates) {
+      // Spark can only pass IN predicates at the moment
+      if (isFilePathInPredicate(predicate)) {
+        Set<String> fileLocations = extractStringLiterals(predicate);
 
         // Spark may call this multiple times for UPDATEs with subqueries
         // as such cases are rewritten using UNION and the same scan on both 
sides
@@ -144,7 +138,7 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
           resetTasks(filteredTasks);
         }
       } else {
-        LOG.warn("Unsupported runtime filter {}", filter);
+        LOG.warn("Unsupported runtime filter {}", predicate);
       }
     }
   }
@@ -188,4 +182,32 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
     Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table(), branch());
     return currentSnapshot != null ? currentSnapshot.snapshotId() : null;
   }
+
+  private static boolean isFilePathInPredicate(Predicate predicate) {
+    if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
+      return false;
+    }
+
+    if (!(predicate.children()[0] instanceof NamedReference)) {
+      return false;
+    }
+
+    String[] fieldNames = ((NamedReference) 
predicate.children()[0]).fieldNames();
+
+    return fieldNames.length == 1
+        && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
+  }
+
+  private static Set<String> extractStringLiterals(Predicate predicate) {
+    Set<String> values = Sets.newHashSet();
+    for (int i = 1; i < predicate.children().length; i++) {
+      if (predicate.children()[i] instanceof Literal) {
+        Object value = ((Literal<?>) predicate.children()[i]).value();
+        // V2 string literals come through as UTF8String; toString() 
materializes the Java String
+        values.add(value.toString());
+      }
+    }
+
+    return values;
+  }
 }

Reply via email to