spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable
Repository: spark Updated Branches: refs/heads/branch-1.6 fbe65c592 -> 90d71bff0 [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable â¦ishable Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply Author: Zee ChenCloses #9679 from zeocio/spark-11390. (cherry picked from commit 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d71bff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d71bff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d71bff Branch: refs/heads/branch-1.6 Commit: 90d71bff0c583830aa3fd96b1dd3607f0cb0cbee Parents: fbe65c5 Author: Zee Chen Authored: Mon Nov 16 14:21:28 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 14:21:41 2015 -0800 -- .../org/apache/spark/sql/execution/ExistingRDD.scala | 6 -- .../execution/datasources/DataSourceStrategy.scala| 6 -- .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++ 3 files changed, 22 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 8b41d3d..62620ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -106,7 +106,9 @@ private[sql] object PhysicalRDD { def createFromDataSource( output: Seq[Attribute], rdd: RDD[InternalRow], - relation: BaseRelation): PhysicalRDD = { -PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation]) + relation: BaseRelation, + extraInformation: String = ""): PhysicalRDD = { +PhysicalRDD(output, rdd, relation.toString + extraInformation, + relation.isInstanceOf[HadoopFsRelation]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bbbfa7..544d5ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) +val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ") + if (projects.map(_.toAttribute) == projects && projectSet.size == projects.size && filterSet.subsetOf(projectSet)) { @@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), -relation.relation) +relation.relation, pushedFiltersString) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), -relation.relation) +relation.relation, pushedFiltersString) execution.Project( projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index be53ec3..dfec139 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++
spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable
Repository: spark Updated Branches: refs/heads/master b1a966262 -> 985b38dd2 [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable â¦ishable Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply Author: Zee ChenCloses #9679 from zeocio/spark-11390. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/985b38dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/985b38dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/985b38dd Branch: refs/heads/master Commit: 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181 Parents: b1a9662 Author: Zee Chen Authored: Mon Nov 16 14:21:28 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 14:21:28 2015 -0800 -- .../org/apache/spark/sql/execution/ExistingRDD.scala | 6 -- .../execution/datasources/DataSourceStrategy.scala| 6 -- .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++ 3 files changed, 22 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 8b41d3d..62620ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -106,7 +106,9 @@ private[sql] object PhysicalRDD { def createFromDataSource( output: Seq[Attribute], rdd: RDD[InternalRow], - relation: BaseRelation): PhysicalRDD = { -PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation]) + relation: BaseRelation, + extraInformation: String = ""): PhysicalRDD = { +PhysicalRDD(output, rdd, relation.toString + extraInformation, + relation.isInstanceOf[HadoopFsRelation]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bbbfa7..544d5ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) +val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ") + if (projects.map(_.toAttribute) == projects && projectSet.size == projects.size && filterSet.subsetOf(projectSet)) { @@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), -relation.relation) +relation.relation, pushedFiltersString) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), -relation.relation) +relation.relation, pushedFiltersString) execution.Project( projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index be53ec3..dfec139 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext { } }