Repository: spark
Updated Branches:
  refs/heads/master c775bf09e -> d9a3a2a0b


[SPARK-16056][SPARK-16057][SPARK-16058][SQL] Fix Multiple Bugs in Column 
Partitioning in JDBC Source

#### What changes were proposed in this pull request?
This PR is to fix the following bugs:

**Issue 1: Wrong Results when lowerBound is larger than upperBound in Column 
Partitioning**
```scala
spark.read.jdbc(
  url = urlWithUserAndPass,
  table = "TEST.seq",
  columnName = "id",
  lowerBound = 4,
  upperBound = 0,
  numPartitions = 3,
  connectionProperties = new Properties)
```
**Before code changes:**
The returned results are wrong and the generated partitions are wrong:
```
  Part 0 id < 3 or id is null
  Part 1 id >= 3 AND id < 2
  Part 2 id >= 2
```
**After code changes:**
Issue an `IllegalArgumentException` exception:
```
Operation not allowed: the lower bound of partitioning column is larger than 
the upper bound. lowerBound: 5; higherBound: 1
```
**Issue 2: numPartitions is more than the number of key values between upper 
and lower bounds**
```scala
spark.read.jdbc(
  url = urlWithUserAndPass,
  table = "TEST.seq",
  columnName = "id",
  lowerBound = 1,
  upperBound = 5,
  numPartitions = 10,
  connectionProperties = new Properties)
```
**Before code changes:**
Returned correct results but the generated partitions are very inefficient, 
like:
```
Partition 0: id < 1 or id is null
Partition 1: id >= 1 AND id < 1
Partition 2: id >= 1 AND id < 1
Partition 3: id >= 1 AND id < 1
Partition 4: id >= 1 AND id < 1
Partition 5: id >= 1 AND id < 1
Partition 6: id >= 1 AND id < 1
Partition 7: id >= 1 AND id < 1
Partition 8: id >= 1 AND id < 1
Partition 9: id >= 1
```
**After code changes:**
Adjust `numPartitions` and can return the correct answers:
```
Partition 0: id < 2 or id is null
Partition 1: id >= 2 AND id < 3
Partition 2: id >= 3 AND id < 4
Partition 3: id >= 4
```
**Issue 3: java.lang.ArithmeticException when numPartitions is zero**
```Scala
spark.read.jdbc(
  url = urlWithUserAndPass,
  table = "TEST.seq",
  columnName = "id",
  lowerBound = 0,
  upperBound = 4,
  numPartitions = 0,
  connectionProperties = new Properties)
```
**Before code changes:**
Got the following exception:
```
  java.lang.ArithmeticException: / by zero
```
**After code changes:**
Able to return a correct answer by disabling column partitioning when 
numPartitions is equal to or less than zero

#### How was this patch tested?
Added test cases to verify the results

Author: gatorsmile <gatorsm...@gmail.com>

Closes #13773 from gatorsmile/jdbcPartitioning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9a3a2a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9a3a2a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9a3a2a0

Branch: refs/heads/master
Commit: d9a3a2a0bec504d17d3b94104d449ee3bd850120
Parents: c775bf0
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Mon Jun 20 21:49:33 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Jun 20 21:49:33 2016 -0700

----------------------------------------------------------------------
 .../datasources/jdbc/JDBCRelation.scala         | 48 ++++++++++-----
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 65 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9a3a2a0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 233b789..11613dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -21,6 +21,7 @@ import java.util.Properties
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, 
SQLContext}
@@ -36,7 +37,7 @@ private[sql] case class JDBCPartitioningInfo(
     upperBound: Long,
     numPartitions: Int)
 
-private[sql] object JDBCRelation {
+private[sql] object JDBCRelation extends Logging {
   /**
    * Given a partitioning schematic (a column of integral type, a number of
    * partitions, and upper and lower bounds on the column's value), generate
@@ -52,29 +53,46 @@ private[sql] object JDBCRelation {
    * @return an array of partitions with where clause for each partition
    */
   def columnPartition(partitioning: JDBCPartitioningInfo): Array[Partition] = {
-    if (partitioning == null) return Array[Partition](JDBCPartition(null, 0))
+    if (partitioning == null || partitioning.numPartitions <= 1 ||
+      partitioning.lowerBound == partitioning.upperBound) {
+      return Array[Partition](JDBCPartition(null, 0))
+    }
 
-    val numPartitions = partitioning.numPartitions
-    val column = partitioning.column
-    if (numPartitions == 1) return Array[Partition](JDBCPartition(null, 0))
+    val lowerBound = partitioning.lowerBound
+    val upperBound = partitioning.upperBound
+    require (lowerBound <= upperBound,
+      "Operation not allowed: the lower bound of partitioning column is larger 
than the upper " +
+      s"bound. Lower bound: $lowerBound; Upper bound: $upperBound")
+
+    val numPartitions =
+      if ((upperBound - lowerBound) >= partitioning.numPartitions) {
+        partitioning.numPartitions
+      } else {
+        logWarning("The number of partitions is reduced because the specified 
number of " +
+          "partitions is less than the difference between upper bound and 
lower bound. " +
+          s"Updated number of partitions: ${upperBound - lowerBound}; Input 
number of " +
+          s"partitions: ${partitioning.numPartitions}; Lower bound: 
$lowerBound; " +
+          s"Upper bound: $upperBound.")
+        upperBound - lowerBound
+      }
     // Overflow and silliness can happen if you subtract then divide.
     // Here we get a little roundoff, but that's (hopefully) OK.
-    val stride: Long = (partitioning.upperBound / numPartitions
-                      - partitioning.lowerBound / numPartitions)
+    val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
+    val column = partitioning.column
     var i: Int = 0
-    var currentValue: Long = partitioning.lowerBound
+    var currentValue: Long = lowerBound
     var ans = new ArrayBuffer[Partition]()
     while (i < numPartitions) {
-      val lowerBound = if (i != 0) s"$column >= $currentValue" else null
+      val lBound = if (i != 0) s"$column >= $currentValue" else null
       currentValue += stride
-      val upperBound = if (i != numPartitions - 1) s"$column < $currentValue" 
else null
+      val uBound = if (i != numPartitions - 1) s"$column < $currentValue" else 
null
       val whereClause =
-        if (upperBound == null) {
-          lowerBound
-        } else if (lowerBound == null) {
-          s"$upperBound or $column is null"
+        if (uBound == null) {
+          lBound
+        } else if (lBound == null) {
+          s"$uBound or $column is null"
         } else {
-          s"$lowerBound AND $upperBound"
+          s"$lBound AND $uBound"
         }
       ans += JDBCPartition(whereClause, i)
       i = i + 1

http://git-wip-us.apache.org/repos/asf/spark/blob/d9a3a2a0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d6ec40c..fd6671a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -184,6 +184,16 @@ class JDBCSuite extends SparkFunSuite
       "insert into test.emp values ('kathy', null, null)").executeUpdate()
     conn.commit()
 
+    conn.prepareStatement(
+      "create table test.seq(id INTEGER)").executeUpdate()
+    (0 to 6).foreach { value =>
+      conn.prepareStatement(
+        s"insert into test.seq values ($value)").executeUpdate()
+    }
+    conn.prepareStatement(
+      "insert into test.seq values (null)").executeUpdate()
+    conn.commit()
+
     sql(
       s"""
          |CREATE TEMPORARY TABLE nullparts
@@ -373,6 +383,61 @@ class JDBCSuite extends SparkFunSuite
         .collect().length === 4)
   }
 
+  test("Partitioning on column where numPartitions is zero") {
+    val res = spark.read.jdbc(
+      url = urlWithUserAndPass,
+      table = "TEST.seq",
+      columnName = "id",
+      lowerBound = 0,
+      upperBound = 4,
+      numPartitions = 0,
+      connectionProperties = new Properties
+    )
+    assert(res.count() === 8)
+  }
+
+  test("Partitioning on column where numPartitions are more than the number of 
total rows") {
+    val res = spark.read.jdbc(
+      url = urlWithUserAndPass,
+      table = "TEST.seq",
+      columnName = "id",
+      lowerBound = 1,
+      upperBound = 5,
+      numPartitions = 10,
+      connectionProperties = new Properties
+    )
+    assert(res.count() === 8)
+  }
+
+  test("Partitioning on column where lowerBound is equal to upperBound") {
+    val res = spark.read.jdbc(
+      url = urlWithUserAndPass,
+      table = "TEST.seq",
+      columnName = "id",
+      lowerBound = 5,
+      upperBound = 5,
+      numPartitions = 4,
+      connectionProperties = new Properties
+    )
+    assert(res.count() === 8)
+  }
+
+  test("Partitioning on column where lowerBound is larger than upperBound") {
+    val e = intercept[IllegalArgumentException] {
+      spark.read.jdbc(
+        url = urlWithUserAndPass,
+        table = "TEST.seq",
+        columnName = "id",
+        lowerBound = 5,
+        upperBound = 1,
+        numPartitions = 3,
+        connectionProperties = new Properties
+      )
+    }.getMessage
+    assert(e.contains("Operation not allowed: the lower bound of partitioning 
column " +
+      "is larger than the upper bound. Lower bound: 5; Upper bound: 1"))
+  }
+
   test("SELECT * on partitioned table with a nullable partition column") {
     assert(sql("SELECT * FROM nullparts").collect().size == 4)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to