This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8115da1a2bb [SPARK-46609][SQL] Avoid exponential explosion in 
PartitioningPreservingUnaryExecNode
f8115da1a2bb is described below

commit f8115da1a2bb33e6344dd69cc38ca7a68c3654b1
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Jan 5 11:23:23 2024 -0800

    [SPARK-46609][SQL] Avoid exponential explosion in 
PartitioningPreservingUnaryExecNode
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/37525 . When 
expanding the output partitioning/ordering with aliases, we have a threshold to 
avoid exponential explosion. However, we missed to apply this threshold in one 
place. This PR fixes it.
    
    ### Why are the changes needed?
    
    to avoid OOM
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #44614 from cloud-fan/oom.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/execution/AliasAwareOutputExpression.scala |  4 +--
 .../ProjectedOrderingAndPartitioningSuite.scala    | 30 ++++++++++++++++++++--
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
index 67d2849e005d..3b847b5852b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
@@ -30,7 +30,7 @@ trait PartitioningPreservingUnaryExecNode extends 
UnaryExecNode
   with AliasAwareOutputExpression {
   final override def outputPartitioning: Partitioning = {
     val partitionings: Seq[Partitioning] = if (hasAlias) {
-      flattenPartitioning(child.outputPartitioning).flatMap {
+      flattenPartitioning(child.outputPartitioning).iterator.flatMap {
         case e: Expression =>
           // We need unique partitionings but if the input partitioning is
           // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> 
b` aliases then after
@@ -44,7 +44,7 @@ trait PartitioningPreservingUnaryExecNode extends 
UnaryExecNode
             .take(aliasCandidateLimit)
             .asInstanceOf[LazyList[Partitioning]]
         case o => Seq(o)
-      }
+      }.take(aliasCandidateLimit).toSeq
     } else {
       // Filter valid partitiongs (only reference output attributes of the 
current plan node)
       val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
index f5839e997560..ec13d48d45f8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
PartitioningCollection, UnknownPartitioning}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, PartitioningCollection, UnknownPartitioning}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StringType
 
 class ProjectedOrderingAndPartitioningSuite
   extends SharedSparkSession with AdaptiveSparkPlanHelper {
@@ -101,6 +104,22 @@ class ProjectedOrderingAndPartitioningSuite
     }
   }
 
+  test("SPARK-46609: Avoid exponential explosion in 
PartitioningPreservingUnaryExecNode") {
+    withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> "2") {
+      val output = Seq(AttributeReference("a", StringType)(), 
AttributeReference("b", StringType)())
+      val plan = ProjectExec(
+        Seq(
+          Alias(output(0), "a1")(),
+          Alias(output(0), "a2")(),
+          Alias(output(1), "b1")(),
+          Alias(output(1), "b2")()
+        ),
+        DummyLeafPlanExec(output)
+      )
+      
assert(plan.outputPartitioning.asInstanceOf[PartitioningCollection].partitionings.length
 == 2)
+    }
+  }
+
   test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to 
complex " +
     "expressions") {
     val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id + id as 
a", "id + id as b")
@@ -192,3 +211,10 @@ class ProjectedOrderingAndPartitioningSuite
     assert(outputOrdering.head.sameOrderExpressions.size == 0)
   }
 }
+
+private case class DummyLeafPlanExec(output: Seq[Attribute]) extends 
LeafExecNode {
+  override protected def doExecute(): RDD[InternalRow] = null
+  override def outputPartitioning: Partitioning = {
+    PartitioningCollection(output.map(attr => HashPartitioning(Seq(attr), 4)))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to