spark git commit: [SPARK-23301][SQL] data source column pruning should work for arbitrary expressions

2018-02-01 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master b3a04283f -> 19c7c7ebd


[SPARK-23301][SQL] data source column pruning should work for arbitrary 
expressions

## What changes were proposed in this pull request?

This PR fixes a mistake in the `PushDownOperatorsToDataSource` rule, the column 
pruning logic is incorrect about `Project`.

## How was this patch tested?

a new test case for column pruning with arbitrary expressions, and improve the 
existing tests to make sure the `PushDownOperatorsToDataSource` really works.

Author: Wenchen Fan 

Closes #20476 from cloud-fan/push-down.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19c7c7eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19c7c7eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19c7c7eb

Branch: refs/heads/master
Commit: 19c7c7ebdef6c1c7a02ebac9af6a24f521b52c37
Parents: b3a0428
Author: Wenchen Fan 
Authored: Thu Feb 1 20:44:46 2018 -0800
Committer: gatorsmile 
Committed: Thu Feb 1 20:44:46 2018 -0800

--
 .../v2/PushDownOperatorsToDataSource.scala  |  53 +
 .../sources/v2/JavaAdvancedDataSourceV2.java|  29 -
 .../sql/sources/v2/DataSourceV2Suite.scala  | 113 +--
 3 files changed, 155 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/19c7c7eb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index df034ad..566a483 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, Expression, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -81,35 +81,34 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
 
 // TODO: add more push down rules.
 
-// TODO: nested fields pruning
-def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
Seq[Attribute]): Unit = {
-  plan match {
-case Project(projectList, child) =>
-  val required = 
projectList.filter(requiredByParent.contains).flatMap(_.references)
-  pushDownRequiredColumns(child, required)
-
-case Filter(condition, child) =>
-  val required = requiredByParent ++ condition.references
-  pushDownRequiredColumns(child, required)
-
-case DataSourceV2Relation(fullOutput, reader) => reader match {
-  case r: SupportsPushDownRequiredColumns =>
-// Match original case of attributes.
-val attrMap = AttributeMap(fullOutput.zip(fullOutput))
-val requiredColumns = requiredByParent.map(attrMap)
-r.pruneColumns(requiredColumns.toStructType)
-  case _ =>
-}
+pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
+// After column pruning, we may have redundant PROJECT nodes in the query 
plan, remove them.
+RemoveRedundantProject(filterPushed)
+  }
+
+  // TODO: nested fields pruning
+  private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
AttributeSet): Unit = {
+plan match {
+  case Project(projectList, child) =>
+val required = projectList.flatMap(_.references)
+pushDownRequiredColumns(child, AttributeSet(required))
+
+  case Filter(condition, child) =>
+val required = requiredByParent ++ condition.references
+pushDownRequiredColumns(child, required)
 
-// TODO: there may be more operators can be used to calculate required 
columns, we can add
-// more and more in the future.
-case _ => plan.children.foreach(child => 
pushDownRequiredColumns(child, child.output))
+  case relation: DataSourceV2Relation => relation.reader match {
+case reader: SupportsPushDownRequiredColumns =>
+  val requiredColumns = 
relation.output.filter(requiredByParent.contains)
+  reader.pruneColumns(requiredColumns.toStr

spark git commit: [SPARK-23301][SQL] data source column pruning should work for arbitrary expressions

2018-02-01 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 7baae3aef -> 2b07452ca


[SPARK-23301][SQL] data source column pruning should work for arbitrary 
expressions

This PR fixes a mistake in the `PushDownOperatorsToDataSource` rule, the column 
pruning logic is incorrect about `Project`.

a new test case for column pruning with arbitrary expressions, and improve the 
existing tests to make sure the `PushDownOperatorsToDataSource` really works.

Author: Wenchen Fan 

Closes #20476 from cloud-fan/push-down.

(cherry picked from commit 19c7c7ebdef6c1c7a02ebac9af6a24f521b52c37)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b07452c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b07452c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b07452c

Branch: refs/heads/branch-2.3
Commit: 2b07452cacb4c226c815a216c4cfea2a04227700
Parents: 7baae3a
Author: Wenchen Fan 
Authored: Thu Feb 1 20:44:46 2018 -0800
Committer: gatorsmile 
Committed: Thu Feb 1 20:46:09 2018 -0800

--
 .../v2/PushDownOperatorsToDataSource.scala  |  53 +
 .../sources/v2/JavaAdvancedDataSourceV2.java|  29 -
 .../sql/sources/v2/DataSourceV2Suite.scala  | 111 +--
 3 files changed, 154 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2b07452c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
index df034ad..566a483 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, Expression, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -81,35 +81,34 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
 
 // TODO: add more push down rules.
 
-// TODO: nested fields pruning
-def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
Seq[Attribute]): Unit = {
-  plan match {
-case Project(projectList, child) =>
-  val required = 
projectList.filter(requiredByParent.contains).flatMap(_.references)
-  pushDownRequiredColumns(child, required)
-
-case Filter(condition, child) =>
-  val required = requiredByParent ++ condition.references
-  pushDownRequiredColumns(child, required)
-
-case DataSourceV2Relation(fullOutput, reader) => reader match {
-  case r: SupportsPushDownRequiredColumns =>
-// Match original case of attributes.
-val attrMap = AttributeMap(fullOutput.zip(fullOutput))
-val requiredColumns = requiredByParent.map(attrMap)
-r.pruneColumns(requiredColumns.toStructType)
-  case _ =>
-}
+pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
+// After column pruning, we may have redundant PROJECT nodes in the query 
plan, remove them.
+RemoveRedundantProject(filterPushed)
+  }
+
+  // TODO: nested fields pruning
+  private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
AttributeSet): Unit = {
+plan match {
+  case Project(projectList, child) =>
+val required = projectList.flatMap(_.references)
+pushDownRequiredColumns(child, AttributeSet(required))
+
+  case Filter(condition, child) =>
+val required = requiredByParent ++ condition.references
+pushDownRequiredColumns(child, required)
 
-// TODO: there may be more operators can be used to calculate required 
columns, we can add
-// more and more in the future.
-case _ => plan.children.foreach(child => 
pushDownRequiredColumns(child, child.output))
+  case relation: DataSourceV2Relation => relation.reader match {
+case reader: SupportsPushDownRequiredColumns =>
+  val requiredColumns = 
relation.output.filter(requiredByParent.contains)
+  reader.pruneColumns(