Repository: spark
Updated Branches:
  refs/heads/master dcfe0c5cd -> bb220f657


[SPARK-10040] [SQL] Use batch insert for JDBC writing

JIRA: https://issues.apache.org/jira/browse/SPARK-10040

We should use batch insert instead of single row in JDBC.

Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #8273 from viirya/jdbc-insert-batch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb220f65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb220f65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb220f65

Branch: refs/heads/master
Commit: bb220f6570aa0b95598b30524224a3e82c1effbc
Parents: dcfe0c5
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Fri Aug 21 01:43:49 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Aug 21 01:43:49 2015 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb220f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 2d0e736..26788b2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -88,13 +88,15 @@ object JdbcUtils extends Logging {
       table: String,
       iterator: Iterator[Row],
       rddSchema: StructType,
-      nullTypes: Array[Int]): Iterator[Byte] = {
+      nullTypes: Array[Int],
+      batchSize: Int): Iterator[Byte] = {
     val conn = getConnection()
     var committed = false
     try {
       conn.setAutoCommit(false) // Everything in the same db transaction.
       val stmt = insertStatement(conn, table, rddSchema)
       try {
+        var rowCount = 0
         while (iterator.hasNext) {
           val row = iterator.next()
           val numFields = rddSchema.fields.length
@@ -122,7 +124,15 @@ object JdbcUtils extends Logging {
             }
             i = i + 1
           }
-          stmt.executeUpdate()
+          stmt.addBatch()
+          rowCount += 1
+          if (rowCount % batchSize == 0) {
+            stmt.executeBatch()
+            rowCount = 0
+          }
+        }
+        if (rowCount > 0) {
+          stmt.executeBatch()
         }
       } finally {
         stmt.close()
@@ -211,8 +221,9 @@ object JdbcUtils extends Logging {
     val rddSchema = df.schema
     val driver: String = DriverRegistry.getDriverClassName(url)
     val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, 
properties)
+    val batchSize = properties.getProperty("batchsize", "1000").toInt
     df.foreachPartition { iterator =>
-      savePartition(getConnection, table, iterator, rddSchema, nullTypes)
+      savePartition(getConnection, table, iterator, rddSchema, nullTypes, 
batchSize)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to