This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.14.1-non-deterministic-filter in repository https://gitbox.apache.org/repos/asf/hudi.git
commit da01c53b1b9625b83b1cfc3a2b8d92e1fae8ddd9 Author: Y Ethan Guo <[email protected]> AuthorDate: Fri Jul 18 10:58:54 2025 -0700 WIP --- .../analysis/HoodiePruneFileSourcePartitions.scala | 4 +-- .../sql/hudi/TestNonDeterministicFilters.scala | 35 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala index 26ef2e0188c8..5163f60f4ebd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala @@ -22,7 +22,7 @@ import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression, PredicateHelper, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule @@ -40,7 +40,7 @@ import org.apache.spark.sql.types.StructType case class HoodiePruneFileSourcePartitions(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case op @ PhysicalOperation(projects, filters, lr @ LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _)) + case op @ ScanOperation(projects, filters, lr @ LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _)) if !fileIndex.hasPredicatesPushedDown => val deterministicFilters = filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNonDeterministicFilters.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNonDeterministicFilters.scala new file mode 100644 index 000000000000..024c20770d3f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNonDeterministicFilters.scala @@ -0,0 +1,35 @@ +package org.apache.spark.sql.hudi + +class TestNonDeterministicFilters extends HoodieSparkSqlTestBase { + test("Test Non deterministic filters") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price int, + | ts int + |) using hudi + | location '$tablePath' + | partitioned by (year string) + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql(s"""insert into $tableName values (1, 'n1', 10, 111, '2019')""") + spark.sql(s"""insert into $tableName values (2, 'n2', 20, 222, '2020')""") + spark.sql(s"""insert into $tableName values (3, 'n3', 30, 333, '2020')""") + + val df = spark.sql(s"select * from $tableName where year = '2020' and rand() > 0.01") + df.show() + df.collect() + } + } +}
