Repository: spark
Updated Branches:
  refs/heads/master 0ad6ce7e5 -> c55397652


[SPARK-16208][SQL] Add `PropagateEmptyRelation` optimizer

## What changes were proposed in this pull request?

This PR adds a new logical optimizer, `PropagateEmptyRelation`, to collapse a 
logical plans consisting of only empty LocalRelations.

**Optimizer Targets**

1. Binary(or Higher)-node Logical Plans
   - Union with all empty children.
   - Join with one or two empty children (including Intersect/Except).
2. Unary-node Logical Plans
   - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
   - Aggregate with all empty children and without AggregateFunction 
expressions, COUNT.
   - Generate with Explode because other UserDefinedGenerators like Hive UDTF 
returns results.

**Sample Query**
```sql
WITH t1 AS (SELECT a FROM VALUES 1 t(a)),
     t2 AS (SELECT b FROM VALUES 1 t(b) WHERE 1=2)
SELECT a,b
FROM t1, t2
WHERE a=b
GROUP BY a,b
HAVING a>1
ORDER BY a,b
```

**Before**
```scala
scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from 
values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having 
a>1 order by a,b").explain
== Physical Plan ==
*Sort [a#0 ASC, b#1 ASC], true, 0
+- Exchange rangepartitioning(a#0 ASC, b#1 ASC, 200)
   +- *HashAggregate(keys=[a#0, b#1], functions=[])
      +- Exchange hashpartitioning(a#0, b#1, 200)
         +- *HashAggregate(keys=[a#0, b#1], functions=[])
            +- *BroadcastHashJoin [a#0], [b#1], Inner, BuildRight
               :- *Filter (isnotnull(a#0) && (a#0 > 1))
               :  +- LocalTableScan [a#0]
               +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                  +- *Filter (isnotnull(b#1) && (b#1 > 1))
                     +- LocalTableScan <empty>, [b#1]
```

**After**
```scala
scala> sql("with t1 as (select a from values 1 t(a)), t2 as (select b from 
values 1 t(b) where 1=2) select a,b from t1, t2 where a=b group by a,b having 
a>1 order by a,b").explain
== Physical Plan ==
LocalTableScan <empty>, [a#0, b#1]
```

## How was this patch tested?

Pass the Jenkins tests (including a new testsuite).

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13906 from dongjoon-hyun/SPARK-16208.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5539765
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5539765
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5539765

Branch: refs/heads/master
Commit: c55397652ad1c6d047a8b8eb7fd92a8a1dc66306
Parents: 0ad6ce7
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Fri Jul 1 22:13:56 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Jul 1 22:13:56 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |   3 +-
 .../optimizer/PropagateEmptyRelation.scala      |  78 +++++++++
 .../optimizer/PropagateEmptyRelationSuite.scala | 162 +++++++++++++++++++
 3 files changed, 242 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5539765/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 842d6bc..9ee1735 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -113,7 +113,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: CatalystConf)
     Batch("Typed Filter Optimization", fixedPoint,
       CombineTypedFilters) ::
     Batch("LocalRelation", fixedPoint,
-      ConvertToLocalRelation) ::
+      ConvertToLocalRelation,
+      PropagateEmptyRelation) ::
     Batch("OptimizeCodegen", Once,
       OptimizeCodegen(conf)) ::
     Batch("RewriteSubquery", Once,

http://git-wip-us.apache.org/repos/asf/spark/blob/c5539765/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
new file mode 100644
index 0000000..50076b1
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+/**
+ * Collapse plans consisting empty local relations generated by 
[[PruneFilters]].
+ * 1. Binary(or Higher)-node Logical Plans
+ *    - Union with all empty children.
+ *    - Join with one or two empty children (including Intersect/Except).
+ * 2. Unary-node Logical Plans
+ *    - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
+ *    - Aggregate with all empty children and without AggregateFunction 
expressions like COUNT.
+ *    - Generate(Explode) with all empty children. Others like Hive UDTF may 
return results.
+ */
+object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
+  private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
+    case p: LocalRelation => p.data.isEmpty
+    case _ => false
+  }
+
+  private def containsAggregateExpression(e: Expression): Boolean = {
+    e.collectFirst { case _: AggregateFunction => () }.isDefined
+  }
+
+  private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = 
Seq.empty)
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    case p: Union if p.children.forall(isEmptyLocalRelation) =>
+      empty(p)
+
+    case p @ Join(_, _, joinType, _) if 
p.children.exists(isEmptyLocalRelation) => joinType match {
+      case Inner => empty(p)
+      // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` 
rule.
+      // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
+      case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => 
empty(p)
+      case RightOuter if isEmptyLocalRelation(p.right) => empty(p)
+      case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p)
+      case _ => p
+    }
+
+    case p: UnaryNode if p.children.nonEmpty && 
p.children.forall(isEmptyLocalRelation) => p match {
+      case _: Project => empty(p)
+      case _: Filter => empty(p)
+      case _: Sample => empty(p)
+      case _: Sort => empty(p)
+      case _: GlobalLimit => empty(p)
+      case _: LocalLimit => empty(p)
+      case _: Repartition => empty(p)
+      case _: RepartitionByExpression => empty(p)
+      // AggregateExpressions like COUNT(*) return their results like 0.
+      case Aggregate(_, ae, _) if !ae.exists(containsAggregateExpression) => 
empty(p)
+      // Generators like Hive-style UDTF may return their records within 
`close`.
+      case Generate(_: Explode, _, _, _, _, _) => empty(p)
+      case _ => p
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c5539765/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
new file mode 100644
index 0000000..c549832
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class PropagateEmptyRelationSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("PropagateEmptyRelation", Once,
+        CombineUnions,
+        ReplaceDistinctWithAggregate,
+        ReplaceExceptWithAntiJoin,
+        ReplaceIntersectWithSemiJoin,
+        PushDownPredicate,
+        PruneFilters,
+        PropagateEmptyRelation) :: Nil
+  }
+
+  object OptimizeWithoutPropagateEmptyRelation extends 
RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("OptimizeWithoutPropagateEmptyRelation", Once,
+        CombineUnions,
+        ReplaceDistinctWithAggregate,
+        ReplaceExceptWithAntiJoin,
+        ReplaceIntersectWithSemiJoin,
+        PushDownPredicate,
+        PruneFilters) :: Nil
+  }
+
+  val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = 
Seq(Row(1)))
+  val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = 
Seq(Row(1)))
+
+  test("propagate empty relation through Union") {
+    val query = testRelation1
+      .where(false)
+      .union(testRelation2.where(false))
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = LocalRelation('a.int)
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("propagate empty relation through Join") {
+    // Testcases are tuples of (left predicate, right predicate, joinType, 
correct answer)
+    // Note that `None` is used to compare with 
OptimizeWithoutPropagateEmptyRelation.
+    val testcases = Seq(
+      (true, true, Inner, None),
+      (true, true, LeftOuter, None),
+      (true, true, RightOuter, None),
+      (true, true, FullOuter, None),
+      (true, true, LeftAnti, None),
+      (true, true, LeftSemi, None),
+
+      (true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+      (true, false, LeftOuter, None),
+      (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
+      (true, false, FullOuter, None),
+      (true, false, LeftAnti, None),
+      (true, false, LeftSemi, None),
+
+      (false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
+      (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
+      (false, true, RightOuter, None),
+      (false, true, FullOuter, None),
+      (false, true, LeftAnti, Some(LocalRelation('a.int))),
+      (false, true, LeftSemi, Some(LocalRelation('a.int))),
+
+      (false, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+      (false, false, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
+      (false, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
+      (false, false, FullOuter, Some(LocalRelation('a.int, 'b.int))),
+      (false, false, LeftAnti, Some(LocalRelation('a.int))),
+      (false, false, LeftSemi, Some(LocalRelation('a.int)))
+    )
+
+    testcases.foreach { case (left, right, jt, answer) =>
+      val query = testRelation1
+        .where(left)
+        .join(testRelation2.where(right), joinType = jt, condition = 
Some('a.attr == 'b.attr))
+      val optimized = Optimize.execute(query.analyze)
+      val correctAnswer =
+        
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
+      comparePlans(optimized, correctAnswer)
+    }
+  }
+
+  test("propagate empty relation through UnaryNode") {
+    val query = testRelation1
+      .where(false)
+      .select('a)
+      .groupBy('a)('a)
+      .where('a > 1)
+      .orderBy('a.asc)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = LocalRelation('a.int)
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("don't propagate non-empty local relation") {
+    val query = testRelation1
+      .where(true)
+      .groupBy('a)('a)
+      .where('a > 1)
+      .orderBy('a.asc)
+      .select('a)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = testRelation1
+      .where('a > 1)
+      .groupBy('a)('a)
+      .orderBy('a.asc)
+      .select('a)
+
+    comparePlans(optimized, correctAnswer.analyze)
+  }
+
+  test("propagate empty relation through Aggregate without aggregate 
function") {
+    val query = testRelation1
+      .where(false)
+      .groupBy('a)('a, ('a + 1).as('x))
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = LocalRelation('a.int, 'x.int).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("don't propagate empty relation through Aggregate with aggregate 
function") {
+    val query = testRelation1
+      .where(false)
+      .groupBy('a)(count('a))
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = LocalRelation('a.int).groupBy('a)(count('a)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to