Repository: spark
Updated Branches:
  refs/heads/branch-1.4 9711e9bf1 -> e70be6987


[SPARK-7746][SQL] Add FetchSize parameter for JDBC driver

JIRA: https://issues.apache.org/jira/browse/SPARK-7746

Looks like an easy to add parameter but can show significant performance 
improvement if the JDBC driver accepts it.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #6283 from viirya/jdbc_fetchsize and squashes the following commits:

de47f94 [Liang-Chi Hsieh] Don't keep fetchSize as single parameter.
b7bff2f [Liang-Chi Hsieh] Add FetchSize parameter for JDBC driver.

(cherry picked from commit d0eb9ffe978c663b7aa06e908cadee81767d23d1)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e70be698
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e70be698
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e70be698

Branch: refs/heads/branch-1.4
Commit: e70be6987b2cd705bef7b97917cdee00e3e80aef
Parents: 9711e9b
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Wed May 20 22:23:49 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed May 20 22:24:04 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     |  8 +++--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 33 +++++++++++++++++++-
 2 files changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e70be698/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index f7b1909..be03a23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -211,7 +211,8 @@ private[sql] object JDBCRDD extends Logging {
       fqTable,
       requiredColumns,
       filters,
-      parts)
+      parts,
+      properties)
   }
 }
 
@@ -227,7 +228,8 @@ private[sql] class JDBCRDD(
     fqTable: String,
     columns: Array[String],
     filters: Array[Filter],
-    partitions: Array[Partition])
+    partitions: Array[Partition],
+    properties: Properties)
   extends RDD[Row](sc, Nil) {
 
   /**
@@ -356,6 +358,8 @@ private[sql] class JDBCRDD(
     val sqlText = s"SELECT $columnList FROM $fqTable $myWhereClause"
     val stmt = conn.prepareStatement(sqlText,
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+    val fetchSize = properties.getProperty("fetchSize", "0").toInt
+    stmt.setFetchSize(fetchSize)
     val rs = stmt.executeQuery()
 
     val conversions = getConversions(schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/e70be698/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index a8dddfb..347f283 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -67,7 +67,15 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
         |USING org.apache.spark.sql.jdbc
         |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 
'testPass')
       """.stripMargin.replaceAll("\n", " "))
-
+ 
+    sql(
+      s"""
+        |CREATE TEMPORARY TABLE fetchtwo
+        |USING org.apache.spark.sql.jdbc
+        |OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 
'testPass',
+        |         fetchSize '2')
+      """.stripMargin.replaceAll("\n", " "))
+ 
     sql(
       s"""
         |CREATE TEMPORARY TABLE parts
@@ -185,6 +193,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
     assert(names(2).equals("mary"))
   }
 
+  test("SELECT first field when fetchSize is two") {
+    val names = sql("SELECT NAME FROM fetchtwo").collect().map(x => 
x.getString(0)).sortWith(_ < _)
+    assert(names.size === 3)
+    assert(names(0).equals("fred"))
+    assert(names(1).equals("joe 'foo' \"bar\""))
+    assert(names(2).equals("mary"))
+  }
+
   test("SELECT second field") {
     val ids = sql("SELECT THEID FROM foobar").collect().map(x => 
x.getInt(0)).sortWith(_ < _)
     assert(ids.size === 3)
@@ -192,6 +208,14 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
     assert(ids(1) === 2)
     assert(ids(2) === 3)
   }
+ 
+  test("SELECT second field when fetchSize is two") {
+    val ids = sql("SELECT THEID FROM fetchtwo").collect().map(x => 
x.getInt(0)).sortWith(_ < _)
+    assert(ids.size === 3)
+    assert(ids(0) === 1)
+    assert(ids(1) === 2)
+    assert(ids(2) === 3)
+  }
 
   test("SELECT * partitioned") {
     assert(sql("SELECT * FROM parts").collect().size == 3)
@@ -232,6 +256,13 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
       urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 
3)
   }
 
+  test("Basic API with FetchSize") {
+    val properties = new Properties
+    properties.setProperty("fetchSize", "2")
+    assert(TestSQLContext.read.jdbc(
+      urlWithUserAndPass, "TEST.PEOPLE", properties).collect().length === 3)
+  }
+
   test("Partitioning via JDBCPartitioningInfo API") {
     assert(
       TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 
4, 3, new Properties)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to