Repository: spark
Updated Branches:
  refs/heads/master ba437fc5c -> 0f0d1865f


[SPARK-24402][SQL] Optimize `In` expression when only one element in the 
collection or collection is empty

## What changes were proposed in this pull request?

Two new rules in the logical plan optimizers are added.

1. When there is only one element in the **`Collection`**, the
physical plan will be optimized to **`EqualTo`**, so predicate
pushdown can be used.

```scala
    profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true)
    """
      |== Physical Plan ==
      |*(1) Project [profileID#0]
      |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
      |   +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet,
      |     PartitionFilters: [],
      |     PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
      |     ReadSchema: struct<profileID:int>
    """.stripMargin
```

2. When the **`Collection`** is empty, and the input is nullable, the
logical plan will be simplified to

```scala
    profileDF.filter( $"profileID".isInCollection(Set())).explain(true)
    """
      |== Optimized Logical Plan ==
      |Filter if (isnull(profileID#0)) null else false
      |+- Relation[profileID#0] parquet
    """.stripMargin
```

TODO:

1. For multiple conditions with numbers less than certain thresholds,
we should still allow predicate pushdown.
2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`**
when the numbers of the categories are low, and they are **`Int`**,
**`Long`**.
3. The default immutable hash trees set is slow for query, and we
should do benchmark for using different set implementation for faster
query.
4. **`filter(if (condition) null else false)`** can be optimized to false.

## How was this patch tested?

Couple new tests are added.

Author: DB Tsai <d_t...@apple.com>

Closes #21442 from dbtsai/optimize-in.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f0d1865
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f0d1865
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f0d1865

Branch: refs/heads/master
Commit: 0f0d1865f581a9158d73505471953656b173beba
Parents: ba437fc
Author: DB Tsai <d_t...@apple.com>
Authored: Mon Jul 16 15:33:39 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Mon Jul 16 15:33:39 2018 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/expressions.scala    | 13 +++++---
 .../catalyst/optimizer/OptimizeInSuite.scala    | 32 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0f0d1865/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 1d363b8..f78a0ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -218,15 +218,20 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
 object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case q: LogicalPlan => q transformExpressionsDown {
-      case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+      case In(v, list) if list.isEmpty =>
+        // When v is not nullable, the following expression will be optimized
+        // to FalseLiteral which is tested in OptimizeInSuite.scala
+        If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
       case expr @ In(v, list) if expr.inSetConvertible =>
         val newList = ExpressionSet(list).toSeq
-        if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
+        if (newList.length == 1 && !newList.isInstanceOf[ListQuery]) {
+          EqualTo(v, newList.head)
+        } else if (newList.length > 
SQLConf.get.optimizerInSetConversionThreshold) {
           val hSet = newList.map(e => e.eval(EmptyRow))
           InSet(v, HashSet() ++ hSet)
-        } else if (newList.size < list.size) {
+        } else if (newList.length < list.length) {
           expr.copy(list = newList)
-        } else { // newList.length == list.length
+        } else { // newList.length == list.length && newList.length > 1
           expr
         }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f0d1865/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
index 478118e..86522a6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -176,6 +176,21 @@ class OptimizeInSuite extends PlanTest {
     }
   }
 
+  test("OptimizedIn test: one element in list gets transformed to EqualTo.") {
+    val originalQuery =
+      testRelation
+        .where(In(UnresolvedAttribute("a"), Seq(Literal(1))))
+        .analyze
+
+    val optimized = Optimize.execute(originalQuery)
+    val correctAnswer =
+      testRelation
+        .where(EqualTo(UnresolvedAttribute("a"), Literal(1)))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("OptimizedIn test: In empty list gets transformed to FalseLiteral " +
     "when value is not nullable") {
     val originalQuery =
@@ -191,4 +206,21 @@ class OptimizeInSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("OptimizedIn test: In empty list gets transformed to `If` expression " +
+    "when value is nullable") {
+    val originalQuery =
+      testRelation
+        .where(In(UnresolvedAttribute("a"), Nil))
+        .analyze
+
+    val optimized = Optimize.execute(originalQuery)
+    val correctAnswer =
+      testRelation
+        .where(If(IsNotNull(UnresolvedAttribute("a")),
+          Literal(false), Literal.create(null, BooleanType)))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to