This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 78314af [SPARK-27123][SQL] Improve CollapseProject to handle projects cross limit/repartition/sample 78314af is described below commit 78314af580b38f773a148c6f035d2ddd79896b4c Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Tue Mar 12 21:45:40 2019 +0000 [SPARK-27123][SQL] Improve CollapseProject to handle projects cross limit/repartition/sample ## What changes were proposed in this pull request? `CollapseProject` optimizer rule simplifies some plans by merging the adjacent projects and performing alias substitutions. ```scala scala> sql("SELECT b c FROM (SELECT a b FROM t)").explain == Physical Plan == *(1) Project [a#5 AS c#1] +- Scan hive default.t [a#5], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#5] ``` We can do that more complex cases like the following. This PR aims to handle adjacent projects across limit/repartition/sample. Here, repartition means `Repartition`, not `RepartitionByExpression`. **BEFORE** ```scala scala> sql("SELECT b c FROM (SELECT /*+ REPARTITION(1) */ a b FROM t)").explain == Physical Plan == *(2) Project [b#0 AS c#1] +- Exchange RoundRobinPartitioning(1) +- *(1) Project [a#5 AS b#0] +- Scan hive default.t [a#5], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#5] ``` **AFTER** ```scala scala> sql("SELECT b c FROM (SELECT /*+ REPARTITION(1) */ a b FROM t)").explain == Physical Plan == Exchange RoundRobinPartitioning(1) +- *(1) Project [a#11 AS c#7] +- Scan hive default.t [a#11], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#11] ``` ## How was this patch tested? Pass the Jenkins with the newly added and updated test cases. Closes #24049 from dongjoon-hyun/SPARK-27123. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: DB Tsai <d_t...@apple.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 26 +++++++++++++++++ .../catalyst/optimizer/CollapseProjectSuite.scala | 34 +++++++++++++++++++++- .../catalyst/optimizer/ColumnPruningSuite.scala | 2 +- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 97a53f2..1b7ff02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -699,6 +699,24 @@ object CollapseProject extends Rule[LogicalPlan] { agg.copy(aggregateExpressions = buildCleanedProjectList( p.projectList, agg.aggregateExpressions)) } + case p1 @ Project(_, g @ GlobalLimit(_, l @ LocalLimit(_, p2: Project))) => + if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) { + p1 + } else { + val newProjectList = buildCleanedProjectList(p1.projectList, p2.projectList) + g.copy(child = l.copy(child = p2.copy(projectList = newProjectList))) + } + case p1 @ Project(_, l @ LocalLimit(_, p2: Project)) => + if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) { + p1 + } else { + val newProjectList = buildCleanedProjectList(p1.projectList, p2.projectList) + l.copy(child = p2.copy(projectList = newProjectList)) + } + case Project(l1, r @ Repartition(_, _, p @ Project(l2, _))) if isRenaming(l1, l2) => + r.copy(child = p.copy(projectList = buildCleanedProjectList(l1, p.projectList))) + case Project(l1, s @ Sample(_, _, _, _, p2 @ Project(l2, _))) if isRenaming(l1, l2) => + s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, p2.projectList))) } private def collectAliases(projectList: Seq[NamedExpression]): AttributeMap[Alias] = { @@ -739,6 +757,14 @@ object CollapseProject extends Rule[LogicalPlan] { CleanupAliases.trimNonTopLevelAliases(p).asInstanceOf[NamedExpression] } } + + private def isRenaming(list1: Seq[NamedExpression], list2: Seq[NamedExpression]): Boolean = { + list1.length == list2.length && list1.zip(list2).forall { + case (e1, e2) if e1.semanticEquals(e2) => true + case (Alias(a: Attribute, _), b) if a.metadata == Metadata.empty && a.name == b.name => true + case _ => false + } + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala index e7a5bce..42bcd13 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{Alias, Rand} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types.MetadataBuilder @@ -138,4 +138,36 @@ class CollapseProjectSuite extends PlanTest { assert(projects.size === 1) assert(hasMetadata(optimized)) } + + test("collapse redundant alias through limit") { + val relation = LocalRelation('a.int, 'b.int) + val query = relation.select('a as 'b).limit(1).select('b as 'c).analyze + val optimized = Optimize.execute(query) + val expected = relation.select('a as 'c).limit(1).analyze + comparePlans(optimized, expected) + } + + test("collapse redundant alias through local limit") { + val relation = LocalRelation('a.int, 'b.int) + val query = LocalLimit(1, relation.select('a as 'b)).select('b as 'c).analyze + val optimized = Optimize.execute(query) + val expected = LocalLimit(1, relation.select('a as 'c)).analyze + comparePlans(optimized, expected) + } + + test("collapse redundant alias through repartition") { + val relation = LocalRelation('a.int, 'b.int) + val query = relation.select('a as 'b).repartition(1).select('b as 'c).analyze + val optimized = Optimize.execute(query) + val expected = relation.select('a as 'c).repartition(1).analyze + comparePlans(optimized, expected) + } + + test("collapse redundant alias through sample") { + val relation = LocalRelation('a.int, 'b.int) + val query = Sample(0.0, 0.6, false, 11L, relation.select('a as 'b)).select('b as 'c).analyze + val optimized = Optimize.execute(query) + val expected = Sample(0.0, 0.6, false, 11L, relation.select('a as 'c)).analyze + comparePlans(optimized, expected) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 41bc4d8..b738f30 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -388,7 +388,7 @@ class ColumnPruningSuite extends PlanTest { val query2 = Sample(0.0, 0.6, false, 11L, x).select('a as 'aa) val optimized2 = Optimize.execute(query2.analyze) - val expected2 = Sample(0.0, 0.6, false, 11L, x.select('a)).select('a as 'aa) + val expected2 = Sample(0.0, 0.6, false, 11L, x.select('a as 'aa)) comparePlans(optimized2, expected2.analyze) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org