Repository: spark
Updated Branches:
  refs/heads/master 7d65a0db4 -> ebb9a3b6f


[SPARK-15916][SQL] JDBC filter push down should respect operator precedence

## What changes were proposed in this pull request?

This PR fixes the problem that the precedence order is messed when pushing 
where-clause expression to JDBC layer.

**Case 1:**

For sql `select * from table where (a or b) and c`, the where-clause is wrongly 
converted to JDBC where-clause `a or (b and c)` after filter push down. The 
consequence is that JDBC may returns less or more rows than expected.

**Case 2:**

For sql `select * from table where always_false_condition`, the result table 
may not be empty if the JDBC RDD is partitioned using where-clause:
```
spark.read.jdbc(url, table, predicates = Array("partition 1 where clause", 
"partition 2 where clause"...)
```

## How was this patch tested?

Unit test.

This PR also close #13640

Author: hyukjinkwon <gurwls...@gmail.com>
Author: Sean Zhong <seanzh...@databricks.com>

Closes #13743 from clockfly/SPARK-15916.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebb9a3b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebb9a3b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebb9a3b6

Branch: refs/heads/master
Commit: ebb9a3b6fd834e2c856a192b4455aab83e9c4dc8
Parents: 7d65a0d
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Fri Jun 17 17:11:38 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Jun 17 17:11:38 2016 -0700

----------------------------------------------------------------------
 .../execution/datasources/jdbc/JDBCRDD.scala    |  4 +--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 26 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ebb9a3b6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 8d0906e..44cfbb9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -305,14 +305,14 @@ private[sql] class JDBCRDD(
    * `filters`, but as a WHERE clause suitable for injection into a SQL query.
    */
   private val filterWhereClause: String =
-    filters.flatMap(JDBCRDD.compileFilter).mkString(" AND ")
+    filters.flatMap(JDBCRDD.compileFilter).map(p => s"($p)").mkString(" AND ")
 
   /**
    * A WHERE clause representing both `filters`, if any, and the current 
partition.
    */
   private def getWhereClause(part: JDBCPartition): String = {
     if (part.whereClause != null && filterWhereClause.length > 0) {
-      "WHERE " + filterWhereClause + " AND " + part.whereClause
+      "WHERE " + s"($filterWhereClause)" + " AND " + s"(${part.whereClause})"
     } else if (part.whereClause != null) {
       "WHERE " + part.whereClause
     } else if (filterWhereClause.length > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ebb9a3b6/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index abb7918..d6ec40c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -661,4 +661,30 @@ class JDBCSuite extends SparkFunSuite
     assert(oracleDialect.getJDBCType(StringType).
       map(_.databaseTypeDefinition).get == "VARCHAR2(255)")
   }
+
+  private def assertEmptyQuery(sqlString: String): Unit = {
+    assert(sql(sqlString).collect().isEmpty)
+  }
+
+  test("SPARK-15916: JDBC filter operator push down should respect operator 
precedence") {
+    val TRUE = "NAME != 'non_exists'"
+    val FALSE1 = "THEID > 1000000000"
+    val FALSE2 = "THEID < -1000000000"
+
+    assertEmptyQuery(s"SELECT * FROM foobar WHERE ($TRUE OR $FALSE1) AND 
$FALSE2")
+    assertEmptyQuery(s"SELECT * FROM foobar WHERE $FALSE1 AND ($FALSE2 OR 
$TRUE)")
+
+    // Tests JDBCPartition whereClause clause push down.
+    withTempTable("tempFrame") {
+      val jdbcPartitionWhereClause = s"$FALSE1 OR $TRUE"
+      val df = spark.read.jdbc(
+        urlWithUserAndPass,
+        "TEST.PEOPLE",
+        predicates = Array[String](jdbcPartitionWhereClause),
+        new Properties)
+
+      df.createOrReplaceTempView("tempFrame")
+      assertEmptyQuery(s"SELECT * FROM tempFrame where $FALSE2")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to