This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4aec9d7 [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) for Parquet if filter is on partition col 4aec9d7 is described below commit 4aec9d7daca7a1a146ff1fb1e7541c9443905725 Author: Huaxin Gao <huaxin_...@apple.com> AuthorDate: Wed Oct 27 00:14:00 2021 -0700 [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) for Parquet if filter is on partition col ### What changes were proposed in this pull request? I just realized that with the changes in https://github.com/apache/spark/pull/33650, the restriction for not pushing down Min/Max/Count for partition filter was already removed. This PR just added test to make sure Min/Max/Count in parquet are pushed down if filter is on partition col. ### Why are the changes needed? To complete the work for Aggregate (Min/Max/Count) push down for Parquet ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test Closes #34248 from huaxingao/partitionFilter. Authored-by: Huaxin Gao <huaxin_...@apple.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../v2/parquet/ParquetScanBuilder.scala | 7 ++-- .../parquet/ParquetAggregatePushDownSuite.scala | 40 ++++++++++++++++------ 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 113438a..da49381 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -134,10 +134,9 @@ case class ParquetScanBuilder( // are combined with filter or group by // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8 // SELECT COUNT(col1) FROM t GROUP BY col2 - // Todo: 1. add support if groupby column is partition col - // (https://issues.apache.org/jira/browse/SPARK-36646) - // 2. add support if filter col is partition col - // (https://issues.apache.org/jira/browse/SPARK-36647) + // However, if the filter is on partition column, max/min/count can still be pushed down + // Todo: add support if groupby column is partition col + // (https://issues.apache.org/jira/browse/SPARK-36646) return false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala index 0ae95db..77ecd28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala @@ -129,10 +129,9 @@ abstract class ParquetAggregatePushDownSuite .write.partitionBy("p").parquet(dir.getCanonicalPath) withTempView("tmp") { spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { val count = sql("SELECT COUNT(p) FROM tmp") count.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => @@ -221,7 +220,7 @@ abstract class ParquetAggregatePushDownSuite } } - test("aggregate push down - query with filter not push down") { + test("aggregate push down - aggregate with data filter cannot be pushed down") { val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19), (9, "mno", 7), (2, null, 7)) withParquetTable(data, "t") { @@ -240,6 +239,29 @@ abstract class ParquetAggregatePushDownSuite } } + test("aggregate push down - aggregate with partition filter can be pushed down") { + withTempPath { dir => + spark.range(10).selectExpr("id", "id % 3 as p") + .write.partitionBy("p").parquet(dir.getCanonicalPath) + withTempView("tmp") { + spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); + Seq("false", "true").foreach { enableVectorizedReader => + withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", + vectorizedReaderEnabledKey -> enableVectorizedReader) { + val max = sql("SELECT max(id), min(id), count(id) FROM tmp WHERE p = 0") + max.queryExecution.optimizedPlan.collect { + case _: DataSourceV2ScanRelation => + val expected_plan_fragment = + "PushedAggregation: [MAX(id), MIN(id), COUNT(id)]" + checkKeywordsExistsInExplain(max, expected_plan_fragment) + } + checkAnswer(max, Seq(Row(9, 0, 4))) + } + } + } + } + } + test("aggregate push down - push down only if all the aggregates can be pushed down") { val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19), (9, "mno", 7), (2, null, 7)) @@ -356,10 +378,9 @@ abstract class ParquetAggregatePushDownSuite spark.createDataFrame(rdd, schema).write.parquet(file.getCanonicalPath) withTempView("test") { spark.read.parquet(file.getCanonicalPath).createOrReplaceTempView("test") - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { val testMinWithAllTypes = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " + "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " + @@ -474,10 +495,9 @@ abstract class ParquetAggregatePushDownSuite } test("aggregate push down - column name case sensitivity") { - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { withTempPath { dir => spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").parquet(dir.getCanonicalPath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org