Repository: spark Updated Branches: refs/heads/master ba437fc5c -> 0f0d1865f
[SPARK-24402][SQL] Optimize `In` expression when only one element in the collection or collection is empty ## What changes were proposed in this pull request? Two new rules in the logical plan optimizers are added. 1. When there is only one element in the **`Collection`**, the physical plan will be optimized to **`EqualTo`**, so predicate pushdown can be used. ```scala profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true) """ |== Physical Plan == |*(1) Project [profileID#0] |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6)) | +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet, | PartitionFilters: [], | PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)], | ReadSchema: struct<profileID:int> """.stripMargin ``` 2. When the **`Collection`** is empty, and the input is nullable, the logical plan will be simplified to ```scala profileDF.filter( $"profileID".isInCollection(Set())).explain(true) """ |== Optimized Logical Plan == |Filter if (isnull(profileID#0)) null else false |+- Relation[profileID#0] parquet """.stripMargin ``` TODO: 1. For multiple conditions with numbers less than certain thresholds, we should still allow predicate pushdown. 2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when the numbers of the categories are low, and they are **`Int`**, **`Long`**. 3. The default immutable hash trees set is slow for query, and we should do benchmark for using different set implementation for faster query. 4. **`filter(if (condition) null else false)`** can be optimized to false. ## How was this patch tested? Couple new tests are added. Author: DB Tsai <d_t...@apple.com> Closes #21442 from dbtsai/optimize-in. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f0d1865 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f0d1865 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f0d1865 Branch: refs/heads/master Commit: 0f0d1865f581a9158d73505471953656b173beba Parents: ba437fc Author: DB Tsai <d_t...@apple.com> Authored: Mon Jul 16 15:33:39 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Mon Jul 16 15:33:39 2018 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/expressions.scala | 13 +++++--- .../catalyst/optimizer/OptimizeInSuite.scala | 32 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0f0d1865/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 1d363b8..f78a0ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -218,15 +218,20 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.isEmpty => + // When v is not nullable, the following expression will be optimized + // to FalseLiteral which is tested in OptimizeInSuite.scala + If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) case expr @ In(v, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq - if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) { + if (newList.length == 1 && !newList.isInstanceOf[ListQuery]) { + EqualTo(v, newList.head) + } else if (newList.length > SQLConf.get.optimizerInSetConversionThreshold) { val hSet = newList.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) - } else if (newList.size < list.size) { + } else if (newList.length < list.length) { expr.copy(list = newList) - } else { // newList.length == list.length + } else { // newList.length == list.length && newList.length > 1 expr } } http://git-wip-us.apache.org/repos/asf/spark/blob/0f0d1865/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 478118e..86522a6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -176,6 +176,21 @@ class OptimizeInSuite extends PlanTest { } } + test("OptimizedIn test: one element in list gets transformed to EqualTo.") { + val originalQuery = + testRelation + .where(In(UnresolvedAttribute("a"), Seq(Literal(1)))) + .analyze + + val optimized = Optimize.execute(originalQuery) + val correctAnswer = + testRelation + .where(EqualTo(UnresolvedAttribute("a"), Literal(1))) + .analyze + + comparePlans(optimized, correctAnswer) + } + test("OptimizedIn test: In empty list gets transformed to FalseLiteral " + "when value is not nullable") { val originalQuery = @@ -191,4 +206,21 @@ class OptimizeInSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("OptimizedIn test: In empty list gets transformed to `If` expression " + + "when value is nullable") { + val originalQuery = + testRelation + .where(In(UnresolvedAttribute("a"), Nil)) + .analyze + + val optimized = Optimize.execute(originalQuery) + val correctAnswer = + testRelation + .where(If(IsNotNull(UnresolvedAttribute("a")), + Literal(false), Literal.create(null, BooleanType))) + .analyze + + comparePlans(optimized, correctAnswer) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org