This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 80929d6b549 [SPARK-38832][SQL][FOLLOWUP] Support propagate empty 
expression set for distinct key
80929d6b549 is described below

commit 80929d6b549dfc61ade130a9d59dfa1abe72d681
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Fri Apr 22 18:17:04 2022 +0800

    [SPARK-38832][SQL][FOLLOWUP] Support propagate empty expression set for 
distinct key
    
    ### What changes were proposed in this pull request?
    
    - Improve `DistinctKeyVisitor` that support propagate empty set
    - Small improvement for match alias
    
    ### Why are the changes needed?
    
    Make distinct keys can be used to optimize more case, see comment 
https://github.com/apache/spark/pull/36117#discussion_r853755920
    
    ### Does this PR introduce _any_ user-facing change?
    
    Improve performance
    
    ### How was this patch tested?
    
    add test
    
    Closes #36281 from ulysses-you/SPARK-38832-followup.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../plans/logical/DistinctKeyVisitor.scala         | 25 ++++++++++------------
 .../plans/logical/LogicalPlanDistinctKeys.scala    |  8 +------
 .../plans/logical/DistinctKeyVisitorSuite.scala    |  6 +++++-
 3 files changed, 17 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
index 5b25a326831..1f495688bc5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
@@ -29,25 +29,21 @@ object DistinctKeyVisitor extends 
LogicalPlanVisitor[Set[ExpressionSet]] {
   private def projectDistinctKeys(
       keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): 
Set[ExpressionSet] = {
     val outputSet = ExpressionSet(projectList.map(_.toAttribute))
-    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    val aliases = projectList.collect {
+      // TODO: Expand distinctKeys for redundant aliases on the same expression
+      case alias: Alias if alias.child.deterministic => 
alias.child.canonicalized -> alias
+    }.toMap
     if (aliases.isEmpty) {
       keys.filter(_.subsetOf(outputSet))
     } else {
-      val aliasedDistinctKeys = keys.map { expressionSet =>
-        expressionSet.map { expression =>
-          expression transform {
-            case expr: Expression =>
-              // TODO: Expand distinctKeys for redundant aliases on the same 
expression
-              aliases
-                .collectFirst { case a: Alias if a.child.semanticEquals(expr) 
=> a.toAttribute }
-                .getOrElse(expr)
-          }
-        }
-      }
+      val aliasedDistinctKeys = keys.map(_.map(_.transform {
+        case expr: Expression =>
+          aliases.get(expr.canonicalized).map(_.toAttribute).getOrElse(expr)
+      }))
       aliasedDistinctKeys.collect {
         case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
       } ++ keys.filter(_.subsetOf(outputSet))
-    }.filter(_.nonEmpty)
+    }
   }
 
   /**
@@ -69,7 +65,8 @@ object DistinctKeyVisitor extends 
LogicalPlanVisitor[Set[ExpressionSet]] {
   override def default(p: LogicalPlan): Set[ExpressionSet] = 
Set.empty[ExpressionSet]
 
   override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
-    val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by 
a, a
+    // handle group by a, a and global aggregate
+    val groupingExps = ExpressionSet(p.groupingExpressions)
     projectDistinctKeys(addDistinctKey(p.child.distinctKeys, groupingExps), 
p.aggregateExpressions)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
index 2ffa5a0e594..1843c2da478 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
@@ -29,12 +29,6 @@ import 
org.apache.spark.sql.internal.SQLConf.PROPAGATE_DISTINCT_KEYS_ENABLED
  */
 trait LogicalPlanDistinctKeys { self: LogicalPlan =>
   lazy val distinctKeys: Set[ExpressionSet] = {
-    if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) {
-      val keys = DistinctKeyVisitor.visit(self)
-      require(keys.forall(_.nonEmpty))
-      keys
-    } else {
-      Set.empty
-    }
+    if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) 
DistinctKeyVisitor.visit(self) else Set.empty
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
index bbf69ae622b..55ce5dcd72a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
@@ -61,7 +61,11 @@ class DistinctKeyVisitorSuite extends PlanTest {
     checkDistinctAttributes(t1.groupBy($"a")($"a", max($"b")), 
Set(ExpressionSet(Seq(a))))
     checkDistinctAttributes(t1.groupBy($"a", $"b")($"a", $"b", d, e),
       Set(ExpressionSet(Seq(a, b)), ExpressionSet(Seq(d.toAttribute, 
e.toAttribute))))
-    checkDistinctAttributes(t1.groupBy()(sum($"c")), Set.empty)
+    checkDistinctAttributes(t1.groupBy()(sum($"c")), Set(ExpressionSet()))
+    // ExpressionSet() is a subset of anything, so we do not need 
ExpressionSet(c2)
+    checkDistinctAttributes(t1.groupBy()(sum($"c") as 
"c2").groupBy($"c2")("c2"),
+      Set(ExpressionSet()))
+    checkDistinctAttributes(t1.groupBy()(), Set(ExpressionSet()))
     checkDistinctAttributes(t1.groupBy($"a")($"a", $"a" % 10, d, sum($"b")),
       Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(d.toAttribute))))
     checkDistinctAttributes(t1.groupBy(f.child, $"b")(f, $"b", sum($"c")),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to