Repository: spark Updated Branches: refs/heads/master 9267bc68f -> 6f710f9fd
[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx' Current plan: ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == +- Filter (col0#0 = xxx) +- Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` This patch enables a plan below; ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` Author: Takeshi YAMAMURO <linguin....@gmail.com> Closes #10427 from maropu/RemoveFilterInJdbcScan. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f710f9f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f710f9f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f710f9f Branch: refs/heads/master Commit: 6f710f9fd4f85370557b7705020ff16f2385e645 Parents: 9267bc6 Author: Takeshi YAMAMURO <linguin....@gmail.com> Authored: Wed Feb 10 09:45:13 2016 +0800 Committer: Yin Huai <yh...@databricks.com> Committed: Wed Feb 10 09:45:13 2016 +0800 ---------------------------------------------------------------------- .../execution/datasources/jdbc/JDBCRDD.scala | 2 +- .../datasources/jdbc/JDBCRelation.scala | 5 ++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 70 ++++++++++++++------ 3 files changed, 56 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index d867e14..befba86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -189,7 +189,7 @@ private[sql] object JDBCRDD extends Logging { * Turns a single Filter into a String representing a SQL expression. * Returns None for an unhandled filter. */ - private def compileFilter(f: Filter): Option[String] = { + private[jdbc] def compileFilter(f: Filter): Option[String] = { Option(f match { case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" case EqualNullSafe(attr, value) => http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 572be82..ee6373d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -90,6 +90,11 @@ private[sql] case class JDBCRelation( override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + // Check if JDBCRDD.compileFilter can accept input filters + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + filters.filter(JDBCRDD.compileFilter(_).isEmpty) + } + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] JDBCRDD.scanTable( http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5186075..7a0f7ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -22,12 +22,12 @@ import java.sql.{Date, DriverManager, Timestamp} import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException -import org.scalatest.BeforeAndAfter -import org.scalatest.PrivateMethodTester +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.Row import org.apache.spark.sql.execution.ExplainCommand +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.PhysicalRDD import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.sources._ @@ -183,26 +183,34 @@ class JDBCSuite extends SparkFunSuite } test("SELECT * WHERE (simple predicates)") { - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + def checkPushdown(df: DataFrame): DataFrame = { + val parentPlan = df.queryExecution.executedPlan + // Check if SparkPlan Filter is removed in a physical plan and + // the plan only has PhysicalRDD to scan JDBCRelation. + assert(parentPlan.isInstanceOf[PhysicalRDD]) + assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + df + } + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + "AND THEID = 2")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) // This is a test to reflect discussion in SPARK-12218. // The older versions of spark have this kind of bugs in parquet data source. @@ -210,6 +218,28 @@ class JDBCSuite extends SparkFunSuite val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") assert(df1.collect.toSet === Set(Row("mary", 2))) assert(df2.collect.toSet === Set(Row("mary", 2))) + + def checkNotPushdown(df: DataFrame): DataFrame = { + val parentPlan = df.queryExecution.executedPlan + // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD + // cannot compile given predicates. + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) + val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] + assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.Filter]) + df + } + assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0) + assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2) + } + + test("SELECT COUNT(1) WHERE (predicates)") { + // Check if an answer is correct when Filter is removed from operations such as count() which + // does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in + // org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters + // are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD + // correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more + // discussions. + assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1))) } test("SELECT * WHERE (quoted strings)") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org