This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.14.1-non-deterministic-filter
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit da01c53b1b9625b83b1cfc3a2b8d92e1fae8ddd9
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Jul 18 10:58:54 2025 -0700

    WIP
---
 .../analysis/HoodiePruneFileSourcePartitions.scala |  4 +--
 .../sql/hudi/TestNonDeterministicFilters.scala     | 35 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
index 26ef2e0188c8..5163f60f4ebd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, ExpressionSet, NamedExpression, PredicateHelper, 
SubqueryExpression}
-import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.planning.ScanOperation
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.StructType
 case class HoodiePruneFileSourcePartitions(spark: SparkSession) extends 
Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
-    case op @ PhysicalOperation(projects, filters, lr @ 
LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _))
+    case op @ ScanOperation(projects, filters, lr @ 
LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _))
       if !fileIndex.hasPredicatesPushedDown =>
 
       val deterministicFilters = filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNonDeterministicFilters.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNonDeterministicFilters.scala
new file mode 100644
index 000000000000..024c20770d3f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNonDeterministicFilters.scala
@@ -0,0 +1,35 @@
+package org.apache.spark.sql.hudi
+
+class TestNonDeterministicFilters extends HoodieSparkSqlTestBase {
+  test("Test Non deterministic filters") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price int,
+           |  ts int
+           |) using hudi
+           | location '$tablePath'
+           | partitioned by (year string)
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      spark.sql(s"""insert into $tableName values (1, 'n1', 10, 111, 
'2019')""")
+      spark.sql(s"""insert into $tableName values (2, 'n2', 20, 222, 
'2020')""")
+      spark.sql(s"""insert into $tableName values (3, 'n3', 30, 333, 
'2020')""")
+
+      val df = spark.sql(s"select * from $tableName where year = '2020' and 
rand() > 0.01")
+      df.show()
+      df.collect()
+    }
+  }
+}

Reply via email to