Repository: spark
Updated Branches:
  refs/heads/master 132a3f470 -> 1e17ab83d


[SPARK-22662][SQL] Failed to prune columns after rewriting predicate subquery

## What changes were proposed in this pull request?

As a simple example:
```
spark-sql> create table base (a int, b int) using parquet;
Time taken: 0.066 seconds
spark-sql> create table relInSubq ( x int, y int, z int) using parquet;
Time taken: 0.042 seconds
spark-sql> explain select a from base where a in (select x from relInSubq);
== Physical Plan ==
*Project [a#83]
+- *BroadcastHashJoin [a#83], [x#85], LeftSemi, BuildRight
   :- *FileScan parquet default.base[a#83,b#84] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[hdfs://100.0.0.4:9000/wzh/base], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<a:int,b:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
true] as bigint)))
      +- *Project [x#85]
         +- *FileScan parquet default.relinsubq[x#85] Batched: true, Format: 
Parquet, Location: InMemoryFileIndex[hdfs://100.0.0.4:9000/wzh/relinsubq], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:int>
```
We only need column `a` in table `base`, but all columns (`a`, `b`) are fetched.

The reason is that, in "Operator Optimizations" batch, `ColumnPruning` first 
produces a `Project` on table `base`, but then it's removed by 
`removeProjectBeforeFilter`. Because at that time, the predicate subquery is in 
filter form. Then, in "Rewrite Subquery" batch, `RewritePredicateSubquery` 
converts the subquery into a LeftSemi join, but this batch doesn't have the 
`ColumnPruning` rule. This results in reading all columns for the `base` table.

## How was this patch tested?
Added a new test case.

Author: Zhenhua Wang <wangzhen...@huawei.com>

Closes #19855 from wzhfy/column_pruning_subquery.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e17ab83
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e17ab83
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e17ab83

Branch: refs/heads/master
Commit: 1e17ab83de29bca1823a537d7c57ffc4de8a26ee
Parents: 132a3f4
Author: Zhenhua Wang <wangzhen...@huawei.com>
Authored: Tue Dec 5 15:15:32 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Dec 5 15:15:32 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  4 +-
 .../optimizer/RewriteSubquerySuite.scala        | 55 ++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e17ab83/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 8a5c486..484cd8c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -141,7 +141,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
       CheckCartesianProducts) ::
     Batch("RewriteSubquery", Once,
       RewritePredicateSubquery,
-      CollapseProject) :: Nil
+      ColumnPruning,
+      CollapseProject,
+      RemoveRedundantProject) :: Nil
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1e17ab83/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
new file mode 100644
index 0000000..6b3739c
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteSubquerySuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.ListQuery
+import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+
+class RewriteSubquerySuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Column Pruning", FixedPoint(100), ColumnPruning) ::
+      Batch("Rewrite Subquery", FixedPoint(1),
+        RewritePredicateSubquery,
+        ColumnPruning,
+        CollapseProject,
+        RemoveRedundantProject) :: Nil
+  }
+
+  test("Column pruning after rewriting predicate subquery") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val relInSubquery = LocalRelation('x.int, 'y.int, 'z.int)
+
+    val query = 
relation.where('a.in(ListQuery(relInSubquery.select('x)))).select('a)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = relation
+      .select('a)
+      .join(relInSubquery.select('x), LeftSemi, Some('a === 'x))
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to