This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ed6e7b  [SPARK-36677][SQL] NestedColumnAliasing should not push down 
aggregate functions into projections
2ed6e7b is described below

commit 2ed6e7bc5de7811627de919398865eebb0cfa7cf
Author: Venkata Sai Akhil Gudesa <venkata.gud...@databricks.com>
AuthorDate: Tue Sep 7 18:15:48 2021 -0700

    [SPARK-36677][SQL] NestedColumnAliasing should not push down aggregate 
functions into projections
    
    ### What changes were proposed in this pull request?
    
    This PR filters out `ExtractValues`s that contains any aggregation function 
in the `NestedColumnAliasing` rule to prevent cases where aggregations are 
pushed down into projections.
    
    ### Why are the changes needed?
    
    To handle a corner/missed case in `NestedColumnAliasing` that can cause 
users to encounter a runtime exception.
    
    Consider the following schema:
    ```
    root
     |-- a: struct (nullable = true)
     |    |-- c: struct (nullable = true)
     |    |    |-- e: string (nullable = true)
     |    |-- d: integer (nullable = true)
     |-- b: string (nullable = true)
    ```
    and the query:
    `SELECT MAX(a).c.e FROM (SELECT a, b FROM test_aggregates) GROUP BY b`
    
    Executing the query before this PR will result in the error:
    ```
    java.lang.UnsupportedOperationException: Cannot generate code for 
expression: max(input[0, struct<c:struct<e:string>,d:int>, true])
      at 
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotGenerateCodeForExpressionError(QueryExecutionErrors.scala:83)
      at 
org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode(Expression.scala:312)
      at 
org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode$(Expression.scala:311)
      at 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:99)
    ...
    ```
    The optimised plan before this PR is:
    
    ```
    'Aggregate [b#1], [_extract_e#5 AS max(a).c.e#3]
    +- 'Project [max(a#0).c.e AS _extract_e#5, b#1]
       +- Relation default.test_aggregates[a#0,b#1] parquet
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    A new unit test in `NestedColumnAliasingSuite`. The test consists of the 
repro mentioned earlier.
    The produced optimized plan is checked for equivalency with a plan of the 
form:
    ```
     Aggregate [b#452], [max(a#451).c.e AS max('a)[c][e]#456]
    +- LocalRelation <empty>, [a#451, b#452]
    ```
    
    Closes #33921 from vicennial/spark-36677.
    
    Authored-by: Venkata Sai Akhil Gudesa <venkata.gud...@databricks.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../catalyst/optimizer/NestedColumnAliasing.scala  | 15 ++++++++++--
 .../optimizer/NestedColumnAliasingSuite.scala      | 27 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 9facae3b..e2553f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -21,6 +21,7 @@ import scala.collection
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -258,6 +259,13 @@ object NestedColumnAliasing {
       .filter(!_.references.subsetOf(exclusiveAttrSet))
       .groupBy(_.references.head.canonicalized.asInstanceOf[Attribute])
       .flatMap { case (attr: Attribute, nestedFields: 
collection.Seq[ExtractValue]) =>
+
+        // Check if `ExtractValue` expressions contain any aggregate functions 
in their tree. Those
+        // that do should not have an alias generated as it can lead to 
pushing the aggregate down
+        // into a projection.
+        def containsAggregateFunction(ev: ExtractValue): Boolean =
+          ev.find(_.isInstanceOf[AggregateFunction]).isDefined
+
         // Remove redundant [[ExtractValue]]s if they share the same parent 
nest field.
         // For example, when `a.b` and `a.b.c` are in project list, we only 
need to alias `a.b`.
         // Because `a.b` requires all of the inner fields of `b`, we cannot 
prune `a.b.c`.
@@ -268,7 +276,10 @@ object NestedColumnAliasing {
             val child = e.children.head
             nestedFields.forall(f => child.find(_.semanticEquals(f)).isEmpty)
           case _ => true
-        }.distinct
+        }
+          .distinct
+          // Discard [[ExtractValue]]s that contain aggregate functions.
+          .filterNot(containsAggregateFunction)
 
         // If all nested fields of `attr` are used, we don't need to introduce 
new aliases.
         // By default, the [[ColumnPruning]] rule uses `attr` already.
@@ -276,7 +287,7 @@ object NestedColumnAliasing {
         // nested field once.
         val numUsedNestedFields = 
dedupNestedFields.map(_.canonicalized).distinct
           .map { nestedField => totalFieldNum(nestedField.dataType) }.sum
-        if (numUsedNestedFields < totalFieldNum(attr.dataType)) {
+        if (dedupNestedFields.nonEmpty && numUsedNestedFields < 
totalFieldNum(attr.dataType)) {
           Some((attr, dedupNestedFields.toSeq))
         } else {
           None
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
index e49e028..40ab72c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -763,6 +764,32 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
         $"_extract_search_params.col2".as("col2")).analyze
     comparePlans(optimized, query)
   }
+
+  test("SPARK-36677: NestedColumnAliasing should not push down aggregate 
functions into " +
+    "projections") {
+    val nestedRelation = LocalRelation(
+      'a.struct(
+        'c.struct(
+          'e.string),
+        'd.string),
+      'b.string)
+
+    val plan = nestedRelation
+      .select($"a", $"b")
+      .groupBy($"b")(max($"a").getField("c").getField("e"))
+      .analyze
+
+    val optimized = Optimize.execute(plan)
+
+    // The plan should not contain aggregation functions inside the projection
+    SimpleAnalyzer.checkAnalysis(optimized)
+
+    val expected = nestedRelation
+      .groupBy($"b")(max($"a").getField("c").getField("e"))
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
 }
 
 object NestedColumnAliasingSuite {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to