Repository: spark
Updated Branches:
  refs/heads/branch-1.6 fbe65c592 -> 90d71bff0


[SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

…ishable

Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply

Author: Zee Chen <zeec...@us.ibm.com>

Closes #9679 from zeocio/spark-11390.

(cherry picked from commit 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d71bff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d71bff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d71bff

Branch: refs/heads/branch-1.6
Commit: 90d71bff0c583830aa3fd96b1dd3607f0cb0cbee
Parents: fbe65c5
Author: Zee Chen <zeec...@us.ibm.com>
Authored: Mon Nov 16 14:21:28 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Nov 16 14:21:41 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/ExistingRDD.scala  |  6 ++++--
 .../execution/datasources/DataSourceStrategy.scala    |  6 ++++--
 .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++++++++++++++
 3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 8b41d3d..62620ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,9 @@ private[sql] object PhysicalRDD {
   def createFromDataSource(
       output: Seq[Attribute],
       rdd: RDD[InternalRow],
-      relation: BaseRelation): PhysicalRDD = {
-    PhysicalRDD(output, rdd, relation.toString, 
relation.isInstanceOf[HadoopFsRelation])
+      relation: BaseRelation,
+      extraInformation: String = ""): PhysicalRDD = {
+    PhysicalRDD(output, rdd, relation.toString + extraInformation,
+      relation.isInstanceOf[HadoopFsRelation])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bbbfa7..544d5ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
     // `Filter`s or cannot be handled by `relation`.
     val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+    val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", 
"] ")
+
     if (projects.map(_.toAttribute) == projects &&
         projectSet.size == projects.size &&
         filterSet.subsetOf(projectSet)) {
@@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
       val scan = execution.PhysicalRDD.createFromDataSource(
         projects.map(_.toAttribute),
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-        relation.relation)
+        relation.relation, pushedFiltersString)
       filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
     } else {
       // Don't request columns that are only referenced by pushed filters.
@@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
       val scan = execution.PhysicalRDD.createFromDataSource(
         requestedColumns,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-        relation.relation)
+        relation.relation, pushedFiltersString)
       execution.Project(
         projects, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index be53ec3..dfec139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext {
     }
   }
 
+  test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") {
+    withTempPath { file =>
+      val path = file.getCanonicalPath
+      testData.write.parquet(path)
+      val df = sqlContext.read.parquet(path)
+      sqlContext.registerDataFrameAsTable(df, "testPushed")
+
+      withTempTable("testPushed") {
+        val exp = sql("select * from testPushed where key = 
15").queryExecution.executedPlan
+        assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
+      }
+    }
+  }
+
   test("efficient limit -> project -> sort") {
     {
       val query =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to