Repository: spark
Updated Branches:
  refs/heads/master 6343f6655 -> a04cab8f1


[SPARK-16174][SQL] Improve `OptimizeIn` optimizer to remove literal repetitions

## What changes were proposed in this pull request?

This PR improves `OptimizeIn` optimizer to remove the literal repetitions from 
SQL `IN` predicates. This optimizer prevents user mistakes and also can 
optimize some queries like 
[TPCDS-36](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q36.sql#L19).

**Before**
```scala
scala> sql("select state from (select explode(array('CA','TN')) state) where 
state in ('TN','TN','TN','TN','TN','TN','TN')").explain
== Physical Plan ==
*Filter state#6 IN (TN,TN,TN,TN,TN,TN,TN)
+- Generate explode([CA,TN]), false, false, [state#6]
   +- Scan OneRowRelation[]
```

**After**
```scala
scala> sql("select state from (select explode(array('CA','TN')) state) where 
state in ('TN','TN','TN','TN','TN','TN','TN')").explain
== Physical Plan ==
*Filter state#6 IN (TN)
+- Generate explode([CA,TN]), false, false, [state#6]
   +- Scan OneRowRelation[]
```

## How was this patch tested?

Pass the Jenkins tests (including a new testcase).

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13876 from dongjoon-hyun/SPARK-16174.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a04cab8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a04cab8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a04cab8f

Branch: refs/heads/master
Commit: a04cab8f17fcac05f86d2c472558ab98923f91e3
Parents: 6343f66
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Thu Jul 7 19:45:43 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Jul 7 19:45:43 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   |  1 +
 .../sql/catalyst/optimizer/Optimizer.scala      | 20 +++++++++++-----
 .../catalyst/optimizer/OptimizeInSuite.scala    | 24 ++++++++++++++++++++
 3 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a04cab8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index a3b098a..734bacf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -132,6 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate
   }
 
   override def children: Seq[Expression] = value +: list
+  lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal])
 
   override def nullable: Boolean = children.exists(_.nullable)
   override def foldable: Boolean = children.forall(_.foldable)

http://git-wip-us.apache.org/repos/asf/spark/blob/a04cab8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9ee1735..03d15ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -820,16 +820,24 @@ object ConstantFolding extends Rule[LogicalPlan] {
 }
 
 /**
- * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, 
HashSet[Literal])]]
- * which is much faster
+ * Optimize IN predicates:
+ * 1. Removes literal repetitions.
+ * 2. Replaces [[In (value, seq[Literal])]] with optimized version
+ *    [[InSet (value, HashSet[Literal])]] which is much faster.
  */
 case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case q: LogicalPlan => q transformExpressionsDown {
-      case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
-          list.size > conf.optimizerInSetConversionThreshold =>
-        val hSet = list.map(e => e.eval(EmptyRow))
-        InSet(v, HashSet() ++ hSet)
+      case expr @ In(v, list) if expr.inSetConvertible =>
+        val newList = ExpressionSet(list).toSeq
+        if (newList.size > conf.optimizerInSetConversionThreshold) {
+          val hSet = newList.map(e => e.eval(EmptyRow))
+          InSet(v, HashSet() ++ hSet)
+        } else if (newList.size < list.size) {
+          expr.copy(list = newList)
+        } else { // newList.length == list.length
+          expr
+        }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a04cab8f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
index f1a4ea8..0877207 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -42,6 +42,30 @@ class OptimizeInSuite extends PlanTest {
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
 
+  test("OptimizedIn test: Remove deterministic repetitions") {
+    val originalQuery =
+      testRelation
+        .where(In(UnresolvedAttribute("a"),
+          Seq(Literal(1), Literal(1), Literal(2), Literal(2), Literal(1), 
Literal(2))))
+        .where(In(UnresolvedAttribute("b"),
+          Seq(UnresolvedAttribute("a"), UnresolvedAttribute("a"),
+            Round(UnresolvedAttribute("a"), 0), 
Round(UnresolvedAttribute("a"), 0),
+            Rand(0), Rand(0))))
+        .analyze
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      testRelation
+        .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2))))
+        .where(In(UnresolvedAttribute("b"),
+          Seq(UnresolvedAttribute("a"), UnresolvedAttribute("a"),
+            Round(UnresolvedAttribute("a"), 0), 
Round(UnresolvedAttribute("a"), 0),
+            Rand(0), Rand(0))))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("OptimizedIn test: In clause not optimized to InSet when less than 10 
items") {
     val originalQuery =
       testRelation


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to