Repository: spark
Updated Branches:
  refs/heads/master 9267bc68f -> 6f710f9fd


[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing 
unnecessary Spark Filter

Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx'

Current plan:
```
== Optimized Logical Plan ==
Project [col0#0,col1#1]
+- Filter (col0#0 = xxx)
   +- Relation[col0#0,col1#1] 
JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu,
 password=, driver=org.postgresql.Driver})

== Physical Plan ==
+- Filter (col0#0 = xxx)
   +- Scan 
JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu,
 password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: 
[EqualTo(col0,xxx)]
```

This patch enables a plan below;
```
== Optimized Logical Plan ==
Project [col0#0,col1#1]
+- Filter (col0#0 = xxx)
   +- Relation[col0#0,col1#1] 
JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu,
 password=, driver=org.postgresql.Driver})

== Physical Plan ==
Scan 
JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu,
 password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: 
[EqualTo(col0,xxx)]
```

Author: Takeshi YAMAMURO <linguin....@gmail.com>

Closes #10427 from maropu/RemoveFilterInJdbcScan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f710f9f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f710f9f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f710f9f

Branch: refs/heads/master
Commit: 6f710f9fd4f85370557b7705020ff16f2385e645
Parents: 9267bc6
Author: Takeshi YAMAMURO <linguin....@gmail.com>
Authored: Wed Feb 10 09:45:13 2016 +0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Feb 10 09:45:13 2016 +0800

----------------------------------------------------------------------
 .../execution/datasources/jdbc/JDBCRDD.scala    |  2 +-
 .../datasources/jdbc/JDBCRelation.scala         |  5 ++
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 70 ++++++++++++++------
 3 files changed, 56 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index d867e14..befba86 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -189,7 +189,7 @@ private[sql] object JDBCRDD extends Logging {
    * Turns a single Filter into a String representing a SQL expression.
    * Returns None for an unhandled filter.
    */
-  private def compileFilter(f: Filter): Option[String] = {
+  private[jdbc] def compileFilter(f: Filter): Option[String] = {
     Option(f match {
       case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
       case EqualNullSafe(attr, value) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 572be82..ee6373d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(
 
   override val schema: StructType = JDBCRDD.resolveTable(url, table, 
properties)
 
+  // Check if JDBCRDD.compileFilter can accept input filters
+  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
+    filters.filter(JDBCRDD.compileFilter(_).isEmpty)
+  }
+
   override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
     // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
     JDBCRDD.scanTable(

http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5186075..7a0f7ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -22,12 +22,12 @@ import java.sql.{Date, DriverManager, Timestamp}
 import java.util.{Calendar, GregorianCalendar, Properties}
 
 import org.h2.jdbc.JdbcSQLException
-import org.scalatest.BeforeAndAfter
-import org.scalatest.PrivateMethodTester
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.ExplainCommand
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.execution.PhysicalRDD
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
 import org.apache.spark.sql.sources._
@@ -183,26 +183,34 @@ class JDBCSuite extends SparkFunSuite
   }
 
   test("SELECT * WHERE (simple predicates)") {
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 
1")).collect().size == 0)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 
2")).collect().size == 2)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 
1")).collect().size == 1)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 
'fred'")).collect().size == 1)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 
'fred'")).collect().size == 1)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 
'fred'")).collect().size == 2)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 
'fred'")).collect().size == 2)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 
'fred')"))
+    def checkPushdown(df: DataFrame): DataFrame = {
+      val parentPlan = df.queryExecution.executedPlan
+      // Check if SparkPlan Filter is removed in a physical plan and
+      // the plan only has PhysicalRDD to scan JDBCRelation.
+      assert(parentPlan.isInstanceOf[PhysicalRDD])
+      
assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+      df
+    }
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 
1")).collect().size == 0)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 
2")).collect().size == 2)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 
1")).collect().size == 1)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 
'fred'")).collect().size == 1)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 
'fred'")).collect().size == 1)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 
'fred'")).collect().size == 2)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 
'fred'")).collect().size == 2)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 
'fred')"))
       .collect().size == 2)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN 
('fred')"))
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN 
('fred')"))
       .collect().size == 2)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME 
= 'mary'"))
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 
'mary'"))
       .collect().size == 2)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME 
= 'mary' "
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 
'mary' "
       + "AND THEID = 2")).collect().size == 2)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 
'fr%'")).collect().size == 1)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 
'%ed'")).collect().size == 1)
-    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 
'%re%'")).collect().size == 1)
-    assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS 
NULL")).collect().size == 1)
-    assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT 
NULL")).collect().size == 0)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 
'fr%'")).collect().size == 1)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 
'%ed'")).collect().size == 1)
+    assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 
'%re%'")).collect().size == 1)
+    assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS 
NULL")).collect().size == 1)
+    assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT 
NULL")).collect().size == 0)
 
     // This is a test to reflect discussion in SPARK-12218.
     // The older versions of spark have this kind of bugs in parquet data 
source.
@@ -210,6 +218,28 @@ class JDBCSuite extends SparkFunSuite
     val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 
'mary')")
     assert(df1.collect.toSet === Set(Row("mary", 2)))
     assert(df2.collect.toSet === Set(Row("mary", 2)))
+
+    def checkNotPushdown(df: DataFrame): DataFrame = {
+      val parentPlan = df.queryExecution.executedPlan
+      // Check if SparkPlan Filter is not removed in a physical plan because 
JDBCRDD
+      // cannot compile given predicates.
+      
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
+      val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
+      assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.Filter])
+      df
+    }
+    assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 
2")).collect().size == 0)
+    assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 
4")).collect().size == 2)
+  }
+
+  test("SELECT COUNT(1) WHERE (predicates)") {
+    // Check if an answer is correct when Filter is removed from operations 
such as count() which
+    // does not require any columns. In some data sources, e.g., Parquet, 
`requiredColumns` in
+    // org.apache.spark.sql.sources.interfaces is not given in logical plans, 
but some filters
+    // are applied for columns with Filter producing wrong results. On the 
other hand, JDBCRDD
+    // correctly handles this case by assigning `requiredColumns` properly. 
See PR 10427 for more
+    // discussions.
+    assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 
'mary'").collect.toSet === Set(Row(1)))
   }
 
   test("SELECT * WHERE (quoted strings)") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to