This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new e46e98d9f35 [SPARK-40045][SQL] Optimize the order of filtering predicates e46e98d9f35 is described below commit e46e98d9f357a715b95b75a1b44fc04f9d0ffa17 Author: huaxingao <huaxin_...@apple.com> AuthorDate: Tue Feb 7 21:04:30 2023 -0800 [SPARK-40045][SQL] Optimize the order of filtering predicates All the credit of this PR goes to caican00. Here is the original [PR](https://github.com/apache/spark/pull/37479) ### What changes were proposed in this pull request? put untranslated filters to the right side of the translated filters. ### Why are the changes needed? Normally the translated filters (postScanFilters) are simple filters that can be evaluated faster, while the untranslated filters are complicated filters that take more time to evaluate, so we want to evaluate the postScanFilters filters first. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new UT Closes #39892 from huaxingao/filter_order. Authored-by: huaxingao <huaxin_...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit fe67269394993d819f447364de04dfc95cd21775) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/connector/catalog/InMemoryBaseTable.scala | 11 ++++- .../execution/datasources/v2/PushDownUtils.scala | 10 ++++- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 50 +++++++++++++++++++++- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index e7c4c784b98..f169db11085 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -289,7 +289,7 @@ abstract class InMemoryBaseTable( } class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder - with SupportsPushDownRequiredColumns { + with SupportsPushDownRequiredColumns with SupportsPushDownFilters { private var schema: StructType = tableSchema override def build: Scan = @@ -299,6 +299,15 @@ abstract class InMemoryBaseTable( val schemaNames = metadataColumnNames ++ tableSchema.map(_.name) schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name))) } + + private var _pushedFilters: Array[Filter] = Array.empty + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + this._pushedFilters = filters + this._pushedFilters + } + + override def pushedFilters(): Array[Filter] = this._pushedFilters } case class InMemoryStats( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index eb37a27fd7c..fe19ac552f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -66,7 +66,10 @@ object PushDownUtils { val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter => DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr) } - (Left(r.pushedFilters()), (untranslatableExprs ++ postScanFilters).toSeq) + // Normally translated filters (postScanFilters) are simple filters that can be evaluated + // faster, while the untranslated filters are complicated filters that take more time to + // evaluate, so we want to evaluate the postScanFilters filters first. + (Left(r.pushedFilters()), (postScanFilters ++ untranslatableExprs).toSeq) case r: SupportsPushDownV2Filters => // A map from translated data source leaf node filters to original catalyst filter @@ -95,7 +98,10 @@ object PushDownUtils { val postScanFilters = r.pushPredicates(translatedFilters.toArray).map { predicate => DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, translatedFilterToExpr) } - (Right(r.pushedPredicates), (untranslatableExprs ++ postScanFilters).toSeq) + // Normally translated filters (postScanFilters) are simple filters that can be evaluated + // faster, while the untranslated filters are complicated filters that take more time to + // evaluate, so we want to evaluate the postScanFilters filters first. + (Right(r.pushedPredicates), (postScanFilters ++ untranslatableExprs).toSeq) case f: FileScanBuilder => val postScanFilters = f.pushFilters(filters) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 673d8029c24..38bd24356f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -36,6 +36,8 @@ import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.errors.QueryErrorsBase +import org.apache.spark.sql.execution.FilterExec +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming.MemoryStream @@ -49,7 +51,8 @@ import org.apache.spark.unsafe.types.UTF8String abstract class DataSourceV2SQLSuite extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = true) - with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase { + with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase + with AdaptiveSparkPlanHelper { protected val v2Source = classOf[FakeV2Provider].getName override protected val v2Format = v2Source @@ -2919,6 +2922,51 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-40045: Move the post-Scan Filters to the far right") { + val t1 = s"${catalogAndNamespace}table" + withUserDefinedFunction("udfStrLen" -> true) { + withTable(t1) { + spark.udf.register("udfStrLen", (str: String) => str.length) + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val filterBefore = spark.sql( + s""" + |SELECT id, data FROM $t1 + |WHERE udfStrLen(data) = 1 + |and id = 2 + |""".stripMargin + ) + val conditionBefore = + find(filterBefore.queryExecution.executedPlan)(_.isInstanceOf[FilterExec]) + .head.asInstanceOf[FilterExec] + .condition + val expressionsBefore = splitConjunctivePredicates(conditionBefore) + assert(expressionsBefore.length == 3 + && expressionsBefore(0).toString.trim.startsWith("isnotnull(id") + && expressionsBefore(1).toString.trim.startsWith("(id") + && expressionsBefore(2).toString.trim.startsWith("(udfStrLen(data")) + + val filterAfter = spark.sql( + s""" + |SELECT id, data FROM $t1 + |WHERE id = 2 + |and udfStrLen(data) = 1 + |""".stripMargin + ) + val conditionAfter = + find(filterAfter.queryExecution.executedPlan)(_.isInstanceOf[FilterExec]) + .head.asInstanceOf[FilterExec] + .condition + val expressionsAfter = splitConjunctivePredicates(conditionAfter) + assert(expressionsAfter.length == 3 + && expressionsAfter(0).toString.trim.startsWith("isnotnull(id") + && expressionsAfter(1).toString.trim.startsWith("(id") + && expressionsAfter(2).toString.trim.startsWith("(udfStrLen(data")) + } + } + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { checkError( exception = intercept[AnalysisException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org