This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 109247f [SPARK-35985][SQL] push partitionFilters for empty readDataSchema 109247f is described below commit 109247f02e9c8d4da8457a8fa490d5091a730d14 Author: Steven Aerts <steven.ae...@airties.com> AuthorDate: Fri Jul 16 04:52:46 2021 +0000 [SPARK-35985][SQL] push partitionFilters for empty readDataSchema this commit makes sure that for File Source V2 partition filters are also taken into account when the readDataSchema is empty. This is the case for queries like: SELECT count(*) FROM tbl WHERE partition=foo SELECT input_file_name() FROM tbl WHERE partition=foo ### What changes were proposed in this pull request? As described in SPARK-35985 there is bug in the File Datasource V2 which prevents it to push down to the FileScanner for queries like the ones listed above. ### Why are the changes needed? If partitions filters are not pushed down, the whole dataset will be scanned while only one partition is interesting. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? An extra test was added which relies on the output of explain, as is done in other places. Closes #33191 from steven-aerts/SPARK-35985. Authored-by: Steven Aerts <steven.ae...@airties.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit f06aa4a3f34d89e7821ffee19a43a02773dc52e1) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/PruneFileSourcePartitions.scala | 2 +- .../execution/PruneFileSourcePartitionsSuite.scala | 21 +++++++++++++++++++-- .../hive/execution/PrunePartitionSuiteBase.scala | 4 +++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index a88dfb2..0927027 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -120,7 +120,7 @@ private[sql] object PruneFileSourcePartitions case op @ PhysicalOperation(projects, filters, v2Relation @ DataSourceV2ScanRelation(_, scan: FileScan, output)) - if filters.nonEmpty && scan.readDataSchema.nonEmpty => + if filters.nonEmpty => val (partitionKeyFilters, dataFilters) = getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation, scan.readPartitionSchema, filters, output) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index ab37645b..a16545a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.internal.SQLConf @@ -109,9 +110,25 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase { } } + test("SPARK-35985 push filters for empty read schema") { + // Force datasource v2 for parquet + withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, "")) { + withTempPath { dir => + spark.range(10).selectExpr("id", "id % 3 as p") + .write.partitionBy("p").parquet(dir.getCanonicalPath) + withTempView("tmp") { + spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); + assertPrunedPartitions("SELECT COUNT(*) FROM tmp WHERE p = 0", 1, "(tmp.p = 0)") + assertPrunedPartitions("SELECT input_file_name() FROM tmp WHERE p = 0", 1, "(tmp.p = 0)") + } + } + } + } + override def getScanExecPartitionSize(plan: SparkPlan): Long = { plan.collectFirst { - case p: FileSourceScanExec => p - }.get.selectedPartitions.length + case p: FileSourceScanExec => p.selectedPartitions.length + case b: BatchScanExec => b.partitions.size + }.get } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala index 3b13cee..2a690a8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.StatisticsCollectionTestBase import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BinaryOperator, Expression, IsNotNull, Literal} import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED @@ -94,9 +95,10 @@ abstract class PrunePartitionSuiteBase extends StatisticsCollectionTestBase with val plan = qe.sparkPlan assert(getScanExecPartitionSize(plan) == expectedPartitionCount) - val pushedDownPartitionFilters = qe.executedPlan.collectFirst { + val pushedDownPartitionFilters = plan.collectFirst { case scan: FileSourceScanExec => scan.partitionFilters case scan: HiveTableScanExec => scan.partitionPruningPred + case BatchScanExec(_, scan: FileScan, _) => scan.partitionFilters }.map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull])) val pushedFilters = pushedDownPartitionFilters.map(filters => { filters.foldLeft("")((currentStr, exp) => { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org