This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e1182e2c2a Spark 4.1: Migrate SparkCopyOnWriteScan to
SupportsRuntimeV2Filtering (#16295)
e1182e2c2a is described below
commit e1182e2c2a6f4282167f9c4a71889f107b99a011
Author: drexler-sky <[email protected]>
AuthorDate: Tue May 12 09:32:39 2026 -0700
Spark 4.1: Migrate SparkCopyOnWriteScan to SupportsRuntimeV2Filtering
(#16295)
---
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 54 +++++++++++++++-------
1 file changed, 38 insertions(+), 16 deletions(-)
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index 1892c5812a..b8b3a0edea 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -35,16 +35,16 @@ import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
- implements SupportsRuntimeFiltering {
+ implements SupportsRuntimeV2Filtering {
private static final Logger LOG =
LoggerFactory.getLogger(SparkCopyOnWriteScan.class);
@@ -92,17 +92,11 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
}
@Override
- public void filter(Filter[] filters) {
- for (Filter filter : filters) {
- // Spark can only pass In filters at the moment
- if (filter instanceof In
- && ((In)
filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
- In in = (In) filter;
-
- Set<String> fileLocations = Sets.newHashSet();
- for (Object value : in.values()) {
- fileLocations.add((String) value);
- }
+ public void filter(Predicate[] predicates) {
+ for (Predicate predicate : predicates) {
+ // Spark can only pass IN predicates at the moment
+ if (isFilePathInPredicate(predicate)) {
+ Set<String> fileLocations = extractStringLiterals(predicate);
// Spark may call this multiple times for UPDATEs with subqueries
// as such cases are rewritten using UNION and the same scan on both
sides
@@ -124,7 +118,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
resetTasks(filteredTasks);
}
} else {
- LOG.warn("Unsupported runtime filter {}", filter);
+ LOG.warn("Unsupported runtime filter {}", predicate);
}
}
}
@@ -167,4 +161,32 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
"IcebergCopyOnWriteScan(table=%s, schemaId=%s, snapshotId=%s,
branch=%s, filters=%s, groupedBy=%s)",
table(), schema().schemaId(), snapshotId(), branch, filtersDesc(),
groupingKeyDesc());
}
+
+ private static boolean isFilePathInPredicate(Predicate predicate) {
+ if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
+ return false;
+ }
+
+ if (!(predicate.children()[0] instanceof NamedReference)) {
+ return false;
+ }
+
+ String[] fieldNames = ((NamedReference)
predicate.children()[0]).fieldNames();
+
+ return fieldNames.length == 1
+ && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
+ }
+
+ private static Set<String> extractStringLiterals(Predicate predicate) {
+ Set<String> values = Sets.newHashSet();
+ for (int i = 1; i < predicate.children().length; i++) {
+ if (predicate.children()[i] instanceof Literal) {
+ Object value = ((Literal<?>) predicate.children()[i]).value();
+ // V2 string literals come through as UTF8String; toString()
materializes the Java String
+ values.add(value.toString());
+ }
+ }
+
+ return values;
+ }
}